You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/01/03 21:32:20 UTC
[48/50] [abbrv] hadoop git commit: YARN-6921. Allow resource request
to opt out of oversubscription in Fair Scheduler. Contributed by Haibo Chen.
YARN-6921. Allow resource request to opt out of oversubscription in Fair Scheduler. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/05b729c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/05b729c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/05b729c2
Branch: refs/heads/YARN-1011
Commit: 05b729c2c56b291f3c8bdfc7d6c7d3b502f219f5
Parents: 0af7353
Author: Miklos Szegedi <mi...@cloudera.com>
Authored: Wed Nov 22 09:03:05 2017 -0800
Committer: Haibo Chen <ha...@apache.org>
Committed: Wed Jan 3 12:30:03 2018 -0800
----------------------------------------------------------------------
.../scheduler/common/PendingAsk.java | 14 +++-
.../scheduler/fair/FSAppAttempt.java | 5 ++
.../LocalityAppPlacementAllocator.java | 27 ++++++-
.../scheduler/fair/FairSchedulerTestBase.java | 34 ++++++++-
.../scheduler/fair/TestFairScheduler.java | 77 ++++++++++++++++++++
5 files changed, 149 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
index 85d8715..e9931a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
@@ -29,11 +29,15 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class PendingAsk {
private final Resource perAllocationResource;
private final int count;
- public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0);
+ private final boolean isGuaranteedTypeEnforced;
- public PendingAsk(Resource res, int num) {
+ public final static PendingAsk ZERO =
+ new PendingAsk(Resources.none(), 0, false);
+
+ public PendingAsk(Resource res, int num, boolean guaranteedTypeEnforced) {
this.perAllocationResource = res;
this.count = num;
+ this.isGuaranteedTypeEnforced = guaranteedTypeEnforced;
}
public Resource getPerAllocationResource() {
@@ -44,11 +48,17 @@ public class PendingAsk {
return count;
}
+ public boolean isGuaranteedTypeEnforced() {
+ return isGuaranteedTypeEnforced;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("<per-allocation-resource=");
sb.append(getPerAllocationResource());
+ sb.append(", isGuaranteedEnforced=");
+ sb.append(isGuaranteedTypeEnforced());
sb.append(",repeat=");
sb.append(getCount());
sb.append(">");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 2bdac0a..8a89f78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -860,6 +860,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
boolean reserved, boolean opportunistic,
SchedulerRequestKey schedulerKey) {
+ if (pendingAsk.isGuaranteedTypeEnforced() && opportunistic) {
+ // do not attempt to assign an OPPORTUNISTIC container to a resource
+ // request that has explicitly opted out of oversubscription
+ return Resources.none();
+ }
// How much does this request need?
Resource capability = pendingAsk.getPerAllocationResource();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index 766827c..88a6acb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -159,13 +161,16 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
PendingAsk lastPendingAsk =
lastRequest == null ? null : new PendingAsk(
- lastRequest.getCapability(), lastRequest.getNumContainers());
+ lastRequest.getCapability(), lastRequest.getNumContainers(),
+ enforceGuaranteedExecutionType(lastRequest));
String lastRequestedNodePartition =
lastRequest == null ? null : lastRequest.getNodeLabelExpression();
updateResult = new PendingAskUpdateResult(lastPendingAsk,
new PendingAsk(request.getCapability(),
- request.getNumContainers()), lastRequestedNodePartition,
+ request.getNumContainers(),
+ enforceGuaranteedExecutionType(request)),
+ lastRequestedNodePartition,
request.getNodeLabelExpression());
}
}
@@ -192,8 +197,9 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
if (null == request) {
return PendingAsk.ZERO;
} else{
+ boolean guaranteedEnforced = enforceGuaranteedExecutionType(request);
return new PendingAsk(request.getCapability(),
- request.getNumContainers());
+ request.getNumContainers(), guaranteedEnforced);
}
} finally {
readLock.unlock();
@@ -201,6 +207,21 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
}
+ /**
+ * Check for a given ResourceRequest, if its guaranteed execution type
+ * needs to be enforced.
+ * @param request resource request
+ * @return true if its guaranteed execution type is to be enforced.
+ * false otherwise
+ */
+ private static boolean enforceGuaranteedExecutionType(
+ ResourceRequest request) {
+ ExecutionTypeRequest executionType = request.getExecutionTypeRequest();
+ return executionType != null &&
+ executionType.getExecutionType() == ExecutionType.GUARANTEED &&
+ executionType.getEnforceExecutionType();
+ }
+
@Override
public int getOutstandingAsksCount(String resourceName) {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index af4e1dd..6d9df4d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -115,9 +117,15 @@ public class FairSchedulerTestBase {
relaxLocality);
}
+ protected ResourceRequest createResourceRequest(int memory, int vcores,
+ String host, int priority, int numContainers, boolean relaxLocality) {
+ return createResourceRequest(memory, vcores, host, priority,
+ numContainers, relaxLocality, false);
+ }
+
protected ResourceRequest createResourceRequest(
int memory, int vcores, String host, int priority, int numContainers,
- boolean relaxLocality) {
+ boolean relaxLocality, boolean guaranteedExecutionEnforced) {
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
request.setCapability(BuilderUtils.newResource(memory, vcores));
request.setResourceName(host);
@@ -127,6 +135,11 @@ public class FairSchedulerTestBase {
request.setPriority(prio);
request.setRelaxLocality(relaxLocality);
request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+ if (guaranteedExecutionEnforced) {
+ ExecutionTypeRequest executionType = ExecutionTypeRequest.newInstance(
+ ExecutionType.GUARANTEED, true);
+ request.setExecutionTypeRequest(executionType);
+ }
return request;
}
@@ -150,6 +163,13 @@ public class FairSchedulerTestBase {
}
protected ApplicationAttemptId createSchedulingRequest(
+ int memory, String queueId, String userId,
+ int numContainers, boolean guaranteedExecutionEnforced) {
+ return createSchedulingRequest(memory, 1, queueId,
+ userId, numContainers, 1, guaranteedExecutionEnforced);
+ }
+
+ protected ApplicationAttemptId createSchedulingRequest(
int memory, int vcores, String queueId, String userId, int numContainers) {
return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
}
@@ -163,6 +183,13 @@ public class FairSchedulerTestBase {
protected ApplicationAttemptId createSchedulingRequest(
int memory, int vcores, String queueId, String userId, int numContainers,
int priority) {
+ return createSchedulingRequest(memory, vcores, queueId, userId,
+ numContainers, priority, false);
+ }
+
+ protected ApplicationAttemptId createSchedulingRequest(
+ int memory, int vcores, String queueId, String userId, int numContainers,
+ int priority, boolean guaranteedExecutionEnforced) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
// This conditional is for testAclSubmitApplication where app is rejected
@@ -171,8 +198,9 @@ public class FairSchedulerTestBase {
scheduler.addApplicationAttempt(id, false, false);
}
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
- priority, numContainers, true);
+ ResourceRequest request = createResourceRequest(memory, vcores,
+ ResourceRequest.ANY, priority, numContainers, true,
+ guaranteedExecutionEnforced);
ask.add(request);
RMApp rmApp = mock(RMApp.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/05b729c2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index e70053c..d533617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2704,6 +2704,83 @@ public class TestFairScheduler extends FairSchedulerTestBase {
getPriority().getPriority());
}
+ @Test
+ public void testResourceRequestOptOutOfOversubscription() throws Exception {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ true);
+ // disable resource request normalization in fair scheduler
+ int memoryAllocationIncrement = conf.getInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ FairSchedulerConfiguration.
+ DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+ int memoryAllocationMinimum = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+ try {
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node with 4G of memory and 4 vcores and an overallocation
+ // threshold of 0.75f and 0.75f for memory and cpu respectively
+ OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(0.75f, 0.75f));
+ MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+ Resources.createResource(4096, 4), overAllocationInfo);
+ scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+ // create a scheduling request that leaves some unallocated resources
+ ApplicationAttemptId appAttempt1 =
+ createSchedulingRequest(3600, "queue1", "user1", 1, false);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(3600, scheduler.getQueueManager().getQueue("queue1").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers1 =
+ scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers1.size() == 1);
+ assertEquals("unexpected container execution type",
+ ExecutionType.GUARANTEED,
+ allocatedContainers1.get(0).getExecutionType());
+
+ // node utilization is low after the container runs on the node
+ ContainerStatus containerStatus = ContainerStatus.newInstance(
+ allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+ ContainerExitStatus.SUCCESS);
+ node.updateContainersAndNodeUtilization(
+ new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+ Collections.emptyList()),
+ ResourceUtilization.newInstance(1024, 0, 0.1f));
+
+ // create another scheduling request that opts out of oversubscription and
+ // asks for more than what's left unallocated on the node.
+ ApplicationAttemptId appAttempt2 =
+ createSchedulingRequest(1536, "queue2", "user1", 1, true);
+ scheduler.handle(new NodeUpdateSchedulerEvent(node));
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getGuaranteedResourceUsage().getMemorySize());
+ List<Container> allocatedContainers2 =
+ scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+ assertTrue(allocatedContainers2.size() == 0);
+
+ // verify that a reservation is made for the second request
+ assertTrue("A reservation should be made for the second request",
+ scheduler.getNode(node.getNodeID()).getReservedContainer().
+ getReservedResource().equals(Resource.newInstance(1536, 1)));
+ } finally {
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+ false);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ memoryAllocationMinimum);
+ conf.setInt(
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ memoryAllocationIncrement);
+ }
+ }
+
/**
* Test that NO OPPORTUNISTIC containers can be allocated on a node that
* is fully allocated and with a very high utilization.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org