You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by to...@apache.org on 2013/04/25 11:18:25 UTC
svn commit: r1475682 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/
hadoop-yarn/hadoop-yarn-ser...
Author: tomwhite
Date: Thu Apr 25 09:18:24 2013
New Revision: 1475682
URL: http://svn.apache.org/r1475682
Log:
Merge -r 1475680:1475681 from trunk to branch-2. Fixes: YARN-289. Fair scheduler allows reservations that won't fit on node. Contributed by Sandy Ryza.
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1475682&r1=1475681&r2=1475682&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Apr 25 09:18:24 2013
@@ -235,6 +235,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-605. Fix failing unit test in TestNMWebServices when versionInfo has
parantheses like when running on a git checkout. (Hitesh Shah via vinodkv)
+ YARN-289. Fair scheduler allows reservations that won't fit on node.
+ (Sandy Ryza via tomwhite)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1475682&r1=1475681&r2=1475682&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Thu Apr 25 09:18:24 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.util.Build
@Private
@Unstable
public class AppSchedulable extends Schedulable {
+ private static final DefaultResourceCalculator RESOURCE_CALCULATOR
+ = new DefaultResourceCalculator();
+
private FairScheduler scheduler;
private FSSchedulerApp app;
private Resource demand = Resources.createResource(0);
@@ -180,15 +184,15 @@ public class AppSchedulable extends Sche
* update relevant bookeeping. This dispatches ro relevant handlers
* in the {@link FSSchedulerNode} and {@link SchedulerApp} classes.
*/
- private void reserve(FSSchedulerApp application, Priority priority,
- FSSchedulerNode node, Container container, boolean alreadyReserved) {
+ private void reserve(Priority priority, FSSchedulerNode node,
+ Container container, boolean alreadyReserved) {
LOG.info("Making reservation: node=" + node.getHostName() +
" app_id=" + app.getApplicationId());
if (!alreadyReserved) {
- getMetrics().reserveResource(application.getUser(), container.getResource());
- RMContainer rmContainer = application.reserve(node, priority, null,
+ getMetrics().reserveResource(app.getUser(), container.getResource());
+ RMContainer rmContainer = app.reserve(node, priority, null,
container);
- node.reserveResource(application, priority, rmContainer);
+ node.reserveResource(app, priority, rmContainer);
getMetrics().reserveResource(app.getUser(),
container.getResource());
scheduler.getRootQueueMetrics().reserveResource(app.getUser(),
@@ -197,25 +201,24 @@ public class AppSchedulable extends Sche
else {
RMContainer rmContainer = node.getReservedContainer();
- application.reserve(node, priority, rmContainer, container);
- node.reserveResource(application, priority, rmContainer);
+ app.reserve(node, priority, rmContainer, container);
+ node.reserveResource(app, priority, rmContainer);
}
}
/**
- * Remove the reservation on {@code node} for {@ application} at the given
+ * Remove the reservation on {@code node} at the given
* {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode
* handlers for an unreservation.
*/
- private void unreserve(FSSchedulerApp application, Priority priority,
- FSSchedulerNode node) {
+ public void unreserve(Priority priority, FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
- application.unreserve(node, priority);
- node.unreserveResource(application);
+ app.unreserve(node, priority);
+ node.unreserveResource(app);
getMetrics().unreserveResource(
- application.getUser(), rmContainer.getContainer().getResource());
+ app.getUser(), rmContainer.getContainer().getResource());
scheduler.getRootQueueMetrics().unreserveResource(
- application.getUser(), rmContainer.getContainer().getResource());
+ app.getUser(), rmContainer.getContainer().getResource());
}
/**
@@ -224,8 +227,8 @@ public class AppSchedulable extends Sche
* sure the particular request should be facilitated by this node.
*/
private Resource assignContainer(FSSchedulerNode node,
- FSSchedulerApp application, Priority priority,
- ResourceRequest request, NodeType type, boolean reserved) {
+ Priority priority, ResourceRequest request, NodeType type,
+ boolean reserved) {
// How much does this request need?
Resource capability = request.getCapability();
@@ -237,7 +240,7 @@ public class AppSchedulable extends Sche
if (reserved) {
container = node.getReservedContainer().getContainer();
} else {
- container = createContainer(application, node, capability, priority);
+ container = createContainer(app, node, capability, priority);
}
// Can we allocate a container on this node?
@@ -247,9 +250,12 @@ public class AppSchedulable extends Sche
if (availableContainers > 0) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
- application.allocate(type, node, priority, request, container);
+ app.allocate(type, node, priority, request, container);
if (allocatedContainer == null) {
// Did the application need this resource?
+ if (reserved) {
+ unreserve(priority, node);
+ }
return Resources.none();
}
else {
@@ -262,17 +268,17 @@ public class AppSchedulable extends Sche
// If we had previously made a reservation, delete it
if (reserved) {
- unreserve(application, priority, node);
+ unreserve(priority, node);
}
// Inform the node
- node.allocateContainer(application.getApplicationId(),
+ node.allocateContainer(app.getApplicationId(),
allocatedContainer);
return container.getResource();
} else {
// The desired container won't fit here, so reserve
- reserve(application, priority, node, container, reserved);
+ reserve(priority, node, container, reserved);
return FairScheduler.CONTAINER_RESERVED;
}
@@ -287,7 +293,7 @@ public class AppSchedulable extends Sche
// Make sure the application still needs requests at this priority
if (app.getTotalRequiredResources(priority) == 0) {
- unreserve(app, priority, node);
+ unreserve(priority, node);
return Resources.none();
}
} else {
@@ -304,7 +310,8 @@ public class AppSchedulable extends Sche
// (not scheduled) in order to promote better locality.
synchronized (app) {
for (Priority priority : prioritiesToTry) {
- if (app.getTotalRequiredResources(priority) <= 0) {
+ if (app.getTotalRequiredResources(priority) <= 0 ||
+ !hasContainerForNode(priority, node)) {
continue;
}
@@ -321,14 +328,14 @@ public class AppSchedulable extends Sche
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {
- return assignContainer(node, app, priority,
+ return assignContainer(node, priority,
localRequest, NodeType.NODE_LOCAL, reserved);
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
allowedLocality.equals(NodeType.OFF_SWITCH))) {
- return assignContainer(node, app, priority, rackLocalRequest,
+ return assignContainer(node, priority, rackLocalRequest,
NodeType.RACK_LOCAL, reserved);
}
@@ -336,7 +343,7 @@ public class AppSchedulable extends Sche
ResourceRequest.ANY);
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
&& allowedLocality.equals(NodeType.OFF_SWITCH)) {
- return assignContainer(node, app, priority, offSwitchRequest,
+ return assignContainer(node, priority, offSwitchRequest,
NodeType.OFF_SWITCH, reserved);
}
}
@@ -352,4 +359,16 @@ public class AppSchedulable extends Sche
public Resource assignContainer(FSSchedulerNode node) {
return assignContainer(node, false);
}
+
+ /**
+ * Whether this app has containers requests that could be satisfied on the
+ * given node, if the node had full space.
+ */
+ public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
+ // TODO: add checks stuff about node specific scheduling here
+ ResourceRequest request = app.getResourceRequest(prio, ResourceRequest.ANY);
+ return request.getNumContainers() > 0 &&
+ Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
+ request.getCapability(), node.getRMNode().getTotalCapability());
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1475682&r1=1475681&r2=1475682&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Apr 25 09:18:24 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -805,14 +806,25 @@ public class FairScheduler implements Re
AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
- // Reservation exists; try to fulfill the reservation
- LOG.info("Trying to fulfill reservation for application "
- + reservedAppSchedulable.getApp().getApplicationAttemptId()
- + " on node: " + nm);
+ Priority reservedPriority = node.getReservedContainer().getReservedPriority();
+ if (reservedAppSchedulable != null &&
+ !reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
+ // Don't hold the reservation if app can no longer use it
+ LOG.info("Releasing reservation that cannot be satisfied for application "
+ + reservedAppSchedulable.getApp().getApplicationAttemptId()
+ + " on node " + nm);
+ reservedAppSchedulable.unreserve(reservedPriority, node);
+ reservedAppSchedulable = null;
+ } else {
+ // Reservation exists; try to fulfill the reservation
+ LOG.info("Trying to fulfill reservation for application "
+ + reservedAppSchedulable.getApp().getApplicationAttemptId()
+ + " on node: " + nm);
- node.getReservedAppSchedulable().assignReservedContainer(node);
+ node.getReservedAppSchedulable().assignReservedContainer(node);
+ }
}
- else {
+ if (reservedAppSchedulable == null) {
// No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
while (node.getReservedContainer() == null) {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1475682&r1=1475681&r2=1475682&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu Apr 25 09:18:24 2013
@@ -1520,4 +1520,28 @@ public class TestFairScheduler {
}
assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
}
+
+ @Test
+ public void testReservationThatDoesntFit() {
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1",
+ "user1", 1);
+ scheduler.update();
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(updateEvent);
+
+ FSSchedulerApp app = scheduler.applications.get(attId);
+ assertEquals(0, app.getLiveContainers().size());
+ assertEquals(0, app.getReservedContainers().size());
+
+ createSchedulingRequestExistingApplication(1024, 2, attId);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ assertEquals(1, app.getLiveContainers().size());
+ assertEquals(0, app.getReservedContainers().size());
+ }
}