You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/16 18:32:41 UTC
[17/49] hadoop git commit: YARN-5082. Limit ContainerId increase in
fair scheduler if the num of node app reserved reached the limit (sandflee
via asuresh)
YARN-5082. Limit ContainerId increase in fair scheduler if the num of node app reserved reached the limit (sandflee via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5279af7c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5279af7c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5279af7c
Branch: refs/heads/HDFS-7240
Commit: 5279af7cd4afb090da742a96b5786d9dee6224bc
Parents: e0f4620
Author: Arun Suresh <as...@apache.org>
Authored: Fri Jun 10 22:33:42 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Fri Jun 10 22:33:42 2016 -0700
----------------------------------------------------------------------
.../scheduler/fair/FSAppAttempt.java | 62 ++++++++++-------
.../scheduler/fair/TestFairScheduler.java | 72 ++++++++++++++++++++
2 files changed, 108 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5279af7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 5b83c9a..634f667 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -73,7 +73,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
= new DefaultResourceCalculator();
private long startTime;
- private Priority priority;
+ private Priority appPriority;
private ResourceWeights resourceWeights;
private Resource demand = Resources.createResource(0);
private FairScheduler scheduler;
@@ -107,7 +107,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
this.scheduler = scheduler;
this.startTime = scheduler.getClock().getTime();
- this.priority = Priority.newInstance(1);
+ this.appPriority = Priority.newInstance(1);
this.resourceWeights = new ResourceWeights();
}
@@ -309,7 +309,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
// default level is NODE_LOCAL
- if (! allowedLocalityLevel.containsKey(priority)) {
+ if (!allowedLocalityLevel.containsKey(priority)) {
// add the initial time of priority to prevent comparing with FsApp
// startTime and allowedLocalityLevel degrade
lastScheduledContainer.put(priority, currentTimeMs);
@@ -353,7 +353,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
Priority priority, ResourceRequest request,
- Container container) {
+ Container reservedContainer) {
// Update allowed locality level
NodeType allowed = allowedLocalityLevel.get(priority);
if (allowed != null) {
@@ -373,9 +373,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
if (getTotalRequiredResources(priority) <= 0) {
return null;
}
+
+ Container container = reservedContainer;
+ if (container == null) {
+ container =
+ createContainer(node, request.getCapability(), request.getPriority());
+ }
// Create RMContainer
- RMContainer rmContainer = new RMContainerImpl(container,
+ RMContainer rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
@@ -485,21 +491,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* in {@link FSSchedulerNode}..
* return whether reservation was possible with the current threshold limits
*/
- private boolean reserve(Priority priority, FSSchedulerNode node,
- Container container, NodeType type, boolean alreadyReserved) {
+ private boolean reserve(ResourceRequest request, FSSchedulerNode node,
+ Container reservedContainer, NodeType type) {
+ Priority priority = request.getPriority();
if (!reservationExceedsThreshold(node, type)) {
LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + getApplicationId());
- if (!alreadyReserved) {
- getMetrics().reserveResource(getUser(), container.getResource());
+ if (reservedContainer == null) {
+ reservedContainer =
+ createContainer(node, request.getCapability(),
+ request.getPriority());
+ getMetrics().reserveResource(getUser(),
+ reservedContainer.getResource());
RMContainer rmContainer =
- super.reserve(node, priority, null, container);
+ super.reserve(node, priority, null, reservedContainer);
node.reserveResource(this, priority, rmContainer);
setReservation(node);
} else {
RMContainer rmContainer = node.getReservedContainer();
- super.reserve(node, priority, rmContainer, container);
+ super.reserve(node, priority, rmContainer, reservedContainer);
node.reserveResource(this, priority, rmContainer);
setReservation(node);
}
@@ -615,18 +626,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// How much does the node have?
Resource available = node.getUnallocatedResource();
- Container container = null;
+ Container reservedContainer = null;
if (reserved) {
- container = node.getReservedContainer().getContainer();
- } else {
- container = createContainer(node, capability, request.getPriority());
+ reservedContainer = node.getReservedContainer().getContainer();
}
// Can we allocate a container on this node?
if (Resources.fitsIn(capability, available)) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
- allocate(type, node, request.getPriority(), request, container);
+ allocate(type, node, request.getPriority(), request,
+ reservedContainer);
if (allocatedContainer == null) {
// Did the application need this resource?
if (reserved) {
@@ -647,30 +657,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// the AM. Set the amResource for this app and update the leaf queue's AM
// usage
if (!isAmRunning() && !getUnmanagedAM()) {
- setAMResource(container.getResource());
- getQueue().addAMResourceUsage(container.getResource());
+ setAMResource(capability);
+ getQueue().addAMResourceUsage(capability);
setAmRunning(true);
}
- return container.getResource();
+ return capability;
}
// The desired container won't fit here, so reserve
- if (isReservable(container) &&
- reserve(request.getPriority(), node, container, type, reserved)) {
+ if (isReservable(capability) &&
+ reserve(request, node, reservedContainer, type)) {
return FairScheduler.CONTAINER_RESERVED;
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Not creating reservation as container " + container.getId()
- + " is not reservable");
+ LOG.debug("Couldn't creating reservation for " +
+ getName() + ",at priority " + request.getPriority());
}
return Resources.none();
}
}
- private boolean isReservable(Container container) {
+ private boolean isReservable(Resource capacity) {
return scheduler.isAtLeastReservationThreshold(
- getQueue().getPolicy().getResourceCalculator(), container.getResource());
+ getQueue().getPolicy().getResourceCalculator(), capacity);
}
private boolean hasNodeOrRackLocalRequests(Priority priority) {
@@ -907,7 +917,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
public Priority getPriority() {
// Right now per-app priorities are not passed to scheduler,
// so everyone has the same priority.
- return priority;
+ return appPriority;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5279af7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 3e5a40f..ec77a9b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -4480,4 +4480,76 @@ public class TestFairScheduler extends FairSchedulerTestBase {
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
return nm;
}
+
+ @Test(timeout = 120000)
+ public void testContainerAllocationWithContainerIdLeap() throws Exception {
+ conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add two node
+ RMNode node1 = MockNodes.newNodeInfo(1,
+ Resources.createResource(3072, 10), 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ RMNode node2 = MockNodes.newNodeInfo(1,
+ Resources.createResource(3072, 10), 1, "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ ApplicationAttemptId app1 =
+ createSchedulingRequest(2048, "queue1", "user1", 2);
+ scheduler.update();
+ scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+ scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+
+ ApplicationAttemptId app2 =
+ createSchedulingRequest(2048, "queue1", "user1", 1);
+ scheduler.update();
+ scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+ scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+
+ assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+ getResourceUsage().getMemory());
+
+ //container will be reserved at node1
+ RMContainer reservedContainer1 =
+ scheduler.getSchedulerNode(node1.getNodeID()).getReservedContainer();
+ assertNotEquals(reservedContainer1, null);
+ RMContainer reservedContainer2 =
+ scheduler.getSchedulerNode(node2.getNodeID()).getReservedContainer();
+ assertEquals(reservedContainer2, null);
+
+ for (int i = 0; i < 10; i++) {
+ scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+ scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+ }
+
+ // release resource
+ scheduler.handle(new AppAttemptRemovedSchedulerEvent(
+ app1, RMAppAttemptState.KILLED, false));
+
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue1").
+ getResourceUsage().getMemory());
+
+ // container will be allocated at node2
+ scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+ assertEquals(scheduler.getSchedulerApp(app2).
+ getLiveContainers().size(), 1);
+
+ long maxId = 0;
+ for (RMContainer container :
+ scheduler.getSchedulerApp(app2).getLiveContainers()) {
+ assertTrue(
+ container.getContainer().getNodeId().equals(node2.getNodeID()));
+ if (container.getContainerId().getContainerId() > maxId) {
+ maxId = container.getContainerId().getContainerId();
+ }
+ }
+
+ long reservedId = reservedContainer1.getContainerId().getContainerId();
+ assertEquals(reservedId + 1, maxId);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org