You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/04/12 23:42:34 UTC
[47/50] hadoop git commit: YARN-6432. FairScheduler: Reserve
preempted resources for corresponding applications. (Miklos Szegedi via
kasha)
YARN-6432. FairScheduler: Reserve preempted resources for corresponding applications. (Miklos Szegedi via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c3375175
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c3375175
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c3375175
Branch: refs/heads/HDFS-7240
Commit: c3375175d616e0380560f89d491b6b9753a8f3e1
Parents: 9d9087a
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Apr 12 14:17:13 2017 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Apr 12 14:21:20 2017 -0700
----------------------------------------------------------------------
.../rmcontainer/RMContainer.java | 3 +-
.../rmcontainer/RMContainerImpl.java | 2 +-
.../scheduler/SchedulerNode.java | 2 +-
.../scheduler/fair/FSAppAttempt.java | 8 +-
.../scheduler/fair/FSPreemptionThread.java | 25 +-
.../scheduler/fair/FSSchedulerNode.java | 133 +++++-
.../scheduler/fair/FairScheduler.java | 41 +-
.../scheduler/fair/TestFSSchedulerNode.java | 403 +++++++++++++++++++
.../fair/TestFairSchedulerPreemption.java | 19 +
9 files changed, 597 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index 7ad381e..29680e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -42,7 +42,8 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
* when resources are being reserved to fill space for a future container
* allocation.
*/
-public interface RMContainer extends EventHandler<RMContainerEvent> {
+public interface RMContainer extends EventHandler<RMContainerEvent>,
+ Comparable<RMContainer> {
ContainerId getContainerId();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 12fbbea..1e9463a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -63,7 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@SuppressWarnings({"unchecked", "rawtypes"})
-public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
+public class RMContainerImpl implements RMContainer {
private static final Log LOG = LogFactory.getLog(RMContainerImpl.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index af4a001..272537c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -160,7 +160,7 @@ public abstract class SchedulerNode {
* @param rmContainer Allocated container
* @param launchedOnNode True if the container has been launched
*/
- private synchronized void allocateContainer(RMContainer rmContainer,
+ protected synchronized void allocateContainer(RMContainer rmContainer,
boolean launchedOnNode) {
Container container = rmContainer.getContainer();
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index e0dfb73..a1c4b4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -647,7 +647,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Container reservedContainer, NodeType type,
SchedulerRequestKey schedulerKey) {
- if (!reservationExceedsThreshold(node, type)) {
+ RMContainer nodeReservedContainer = node.getReservedContainer();
+ boolean reservableForThisApp = nodeReservedContainer == null ||
+ nodeReservedContainer.getApplicationAttemptId()
+ .equals(getApplicationAttemptId());
+ if (reservableForThisApp &&!reservationExceedsThreshold(node, type)) {
LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + getApplicationId());
if (reservedContainer == null) {
@@ -1139,7 +1143,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
/**
* Is application starved for fairshare or minshare
*/
- private boolean isStarved() {
+ boolean isStarved() {
return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index 65df0c2..efe36a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -113,11 +113,6 @@ class FSPreemptionThread extends Thread {
List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker()
.getNodesByResourceName(rr.getResourceName());
for (FSSchedulerNode node : potentialNodes) {
- // TODO (YARN-5829): Attempt to reserve the node for starved app.
- if (isNodeAlreadyReserved(node, starvedApp)) {
- continue;
- }
-
int maxAMContainers = bestContainers == null ?
Integer.MAX_VALUE : bestContainers.numAMContainers;
PreemptableContainers preemptableContainers =
@@ -134,7 +129,8 @@ class FSPreemptionThread extends Thread {
if (bestContainers != null && bestContainers.containers.size() > 0) {
containersToPreempt.addAll(bestContainers.containers);
- trackPreemptionsAgainstNode(bestContainers.containers);
+ // Reserve the containers for the starved app
+ trackPreemptionsAgainstNode(bestContainers.containers, starvedApp);
}
}
} // End of iteration over RRs
@@ -163,8 +159,10 @@ class FSPreemptionThread extends Thread {
node.getRunningContainersWithAMsAtTheEnd();
containersToCheck.removeAll(node.getContainersForPreemption());
- // Initialize potential with unallocated resources
- Resource potential = Resources.clone(node.getUnallocatedResource());
+ // Initialize potential with unallocated but not reserved resources
+ Resource potential = Resources.subtractFromNonNegative(
+ Resources.clone(node.getUnallocatedResource()),
+ node.getTotalReserved());
for (RMContainer container : containersToCheck) {
FSAppAttempt app =
@@ -182,8 +180,6 @@ class FSPreemptionThread extends Thread {
// Check if we have already identified enough containers
if (Resources.fitsIn(request, potential)) {
return preemptableContainers;
- } else {
- // TODO (YARN-5829): Unreserve the node for the starved app.
}
}
return null;
@@ -195,10 +191,11 @@ class FSPreemptionThread extends Thread {
return nodeReservedApp != null && !nodeReservedApp.equals(app);
}
- private void trackPreemptionsAgainstNode(List<RMContainer> containers) {
+ private void trackPreemptionsAgainstNode(List<RMContainer> containers,
+ FSAppAttempt app) {
FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
.getNode(containers.get(0).getNodeId());
- node.addContainersForPreemption(containers);
+ node.addContainersForPreemption(containers, app);
}
private void preemptContainers(List<RMContainer> containers) {
@@ -232,10 +229,6 @@ class FSPreemptionThread extends Thread {
LOG.info("Killing container " + container);
scheduler.completedContainer(
container, status, RMContainerEventType.KILL);
-
- FSSchedulerNode containerNode = (FSSchedulerNode)
- scheduler.getNodeTracker().getNode(container.getAllocatedNode());
- containerNode.removeContainerForPreemption(container);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index d983ea0..663e3c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -18,18 +18,26 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -38,15 +46,38 @@ import java.util.concurrent.ConcurrentSkipListSet;
public class FSSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
-
private FSAppAttempt reservedAppSchedulable;
- private final Set<RMContainer> containersForPreemption =
+ // Stores list of containers still to be preempted
+ @VisibleForTesting
+ final Set<RMContainer> containersForPreemption =
new ConcurrentSkipListSet<>();
+ // Stores amount of resources preempted and reserved for each app
+ @VisibleForTesting
+ final Map<FSAppAttempt, Resource>
+ resourcesPreemptedForApp = new LinkedHashMap<>();
+ private final Map<ApplicationAttemptId, FSAppAttempt> appIdToAppMap =
+ new HashMap<>();
+ // Sum of resourcesPreemptedForApp values, total resources that are
+ // slated for preemption
+ private Resource totalResourcesPreempted = Resource.newInstance(0, 0);
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
super(node, usePortForNodeName);
}
+ /**
+ * Total amount of reserved resources including reservations and preempted
+ * containers.
+ * @return total resources reserved
+ */
+ Resource getTotalReserved() {
+ Resource totalReserved = Resources.clone(getReservedContainer() != null
+ ? getReservedContainer().getAllocatedResource()
+ : Resource.newInstance(0, 0));
+ Resources.addTo(totalReserved, totalResourcesPreempted);
+ return totalReserved;
+ }
+
@Override
public synchronized void reserveResource(
SchedulerApplicationAttempt application, SchedulerRequestKey schedulerKey,
@@ -110,16 +141,55 @@ public class FSSchedulerNode extends SchedulerNode {
}
/**
+ * List reserved resources after preemption and assign them to the
+ * appropriate applications in a FIFO order.
+ * @return if any resources were allocated
+ */
+ @VisibleForTesting
+ synchronized LinkedHashMap<FSAppAttempt, Resource> getPreemptionList() {
+ cleanupPreemptionList();
+ return new LinkedHashMap<>(resourcesPreemptedForApp);
+ }
+
+ /**
+ * Remove apps that have their preemption requests fulfilled.
+ */
+ private synchronized void cleanupPreemptionList() {
+ Iterator<FSAppAttempt> iterator =
+ resourcesPreemptedForApp.keySet().iterator();
+ while (iterator.hasNext()) {
+ FSAppAttempt app = iterator.next();
+ if (app.isStopped() || !app.isStarved()) {
+ // App does not need more resources
+ Resources.subtractFrom(totalResourcesPreempted,
+ resourcesPreemptedForApp.get(app));
+ appIdToAppMap.remove(app.getApplicationAttemptId());
+ iterator.remove();
+ }
+ }
+ }
+
+ /**
* Mark {@code containers} as being considered for preemption so they are
* not considered again. A call to this requires a corresponding call to
- * {@link #removeContainerForPreemption} to ensure we do not mark a
- * container for preemption and never consider it again and avoid memory
- * leaks.
+ * {@code releaseContainer} to ensure we do not mark a container for
+ * preemption and never consider it again and avoid memory leaks.
*
* @param containers container to mark
*/
- void addContainersForPreemption(Collection<RMContainer> containers) {
- containersForPreemption.addAll(containers);
+ void addContainersForPreemption(Collection<RMContainer> containers,
+ FSAppAttempt app) {
+
+ appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app);
+ resourcesPreemptedForApp.putIfAbsent(app, Resource.newInstance(0, 0));
+ Resource appReserved = resourcesPreemptedForApp.get(app);
+
+ for(RMContainer container : containers) {
+ containersForPreemption.add(container);
+ Resources.addTo(appReserved, container.getAllocatedResource());
+ Resources.addTo(totalResourcesPreempted,
+ container.getAllocatedResource());
+ }
}
/**
@@ -130,11 +200,50 @@ public class FSSchedulerNode extends SchedulerNode {
}
/**
- * Remove container from the set of containers marked for preemption.
- *
- * @param container container to remove
+ * The Scheduler has allocated containers on this node to the given
+ * application.
+ * @param rmContainer Allocated container
+ * @param launchedOnNode True if the container has been launched
+ */
+ @Override
+ protected synchronized void allocateContainer(RMContainer rmContainer,
+ boolean launchedOnNode) {
+ super.allocateContainer(rmContainer, launchedOnNode);
+ Resource allocated = rmContainer.getAllocatedResource();
+ if (!Resources.isNone(allocated)) {
+ // check for satisfied preemption request and update bookkeeping
+ FSAppAttempt app =
+ appIdToAppMap.get(rmContainer.getApplicationAttemptId());
+ if (app != null) {
+ Resource reserved = resourcesPreemptedForApp.get(app);
+ Resource fulfilled = Resources.componentwiseMin(reserved, allocated);
+ Resources.subtractFrom(reserved, fulfilled);
+ Resources.subtractFrom(totalResourcesPreempted, fulfilled);
+ if (Resources.isNone(reserved)) {
+ // No more preempted containers
+ resourcesPreemptedForApp.remove(app);
+ appIdToAppMap.remove(rmContainer.getApplicationAttemptId());
+ }
+ }
+ } else {
+ LOG.error("Allocated empty container" + rmContainer.getContainerId());
+ }
+ }
+
+ /**
+ * Release an allocated container on this node.
+ * It also releases from the reservation list to trigger preemption
+ * allocations.
+ * @param containerId ID of container to be released.
+ * @param releasedByNode whether the release originates from a node update.
*/
- void removeContainerForPreemption(RMContainer container) {
- containersForPreemption.remove(container);
+ @Override
+ public synchronized void releaseContainer(ContainerId containerId,
+ boolean releasedByNode) {
+ RMContainer container = getContainer(containerId);
+ super.releaseContainer(containerId, releasedByNode);
+ if (container != null) {
+ containersForPreemption.remove(container);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 98c14ac..d1a237a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -71,9 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -972,6 +970,31 @@ public class FairScheduler extends
}
}
+ /**
+ * Assign preempted containers to the applications that have reserved
+ * resources for preempted containers.
+ * @param node Node to check
+ * @return assignment has occurred
+ */
+ static boolean assignPreemptedContainers(FSSchedulerNode node) {
+ boolean assignedAny = false;
+ for (Entry<FSAppAttempt, Resource> entry :
+ node.getPreemptionList().entrySet()) {
+ FSAppAttempt app = entry.getKey();
+ Resource preemptionPending = Resources.clone(entry.getValue());
+ while (!app.isStopped() && !Resources.isNone(preemptionPending)) {
+ Resource assigned = app.assignContainer(node);
+ if (Resources.isNone(assigned)) {
+ // Fail to assign, let's not try further
+ break;
+ }
+ assignedAny = true;
+ Resources.subtractFromNonNegative(preemptionPending, assigned);
+ }
+ }
+ return assignedAny;
+ }
+
@VisibleForTesting
void attemptScheduling(FSSchedulerNode node) {
try {
@@ -991,11 +1014,17 @@ public class FairScheduler extends
}
// Assign new containers...
- // 1. Check for reserved applications
- // 2. Schedule if there are no reservations
-
- boolean validReservation = false;
+ // 1. Ensure containers are assigned to the apps that preempted
+ // 2. Check for reserved applications
+ // 3. Schedule if there are no reservations
+
+ // Apps may wait for preempted containers
+ // We have to satisfy these first to avoid cases, when we preempt
+ // a container for A from B and C gets the preempted containers,
+ // when C does not qualify for preemption itself.
+ assignPreemptedContainers(node);
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
+ boolean validReservation = false;
if (reservedAppSchedulable != null) {
validReservation = reservedAppSchedulable.assignReservedContainer(node);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
new file mode 100644
index 0000000..3927b00
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test scheduler node, especially preemption reservations.
+ */
+public class TestFSSchedulerNode {
+ private final ArrayList<RMContainer> containers = new ArrayList<>();
+
+ private RMNode createNode() {
+ RMNode node = mock(RMNode.class);
+ when(node.getTotalCapability()).thenReturn(Resource.newInstance(8192, 8));
+ when(node.getHostName()).thenReturn("host.domain.com");
+ return node;
+ }
+
+ private void createDefaultContainer() {
+ createContainer(Resource.newInstance(1024, 1), null);
+ }
+
+ private RMContainer createContainer(
+ Resource request, ApplicationAttemptId appAttemptId) {
+ RMContainer container = mock(RMContainer.class);
+ Container containerInner = mock(Container.class);
+ ContainerId id = mock(ContainerId.class);
+ when(id.getContainerId()).thenReturn((long)containers.size());
+ when(containerInner.getResource()).
+ thenReturn(Resources.clone(request));
+ when(containerInner.getId()).thenReturn(id);
+ when(containerInner.getExecutionType()).
+ thenReturn(ExecutionType.GUARANTEED);
+ when(container.getApplicationAttemptId()).thenReturn(appAttemptId);
+ when(container.getContainerId()).thenReturn(id);
+ when(container.getContainer()).thenReturn(containerInner);
+ when(container.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+ when(container.getAllocatedResource()).
+ thenReturn(Resources.clone(request));
+ containers.add(container);
+ return container;
+ }
+
+ private void saturateCluster(FSSchedulerNode schedulerNode) {
+ while (!Resources.isNone(schedulerNode.getUnallocatedResource())) {
+ createDefaultContainer();
+ schedulerNode.allocateContainer(containers.get(containers.size() - 1));
+ schedulerNode.containerStarted(containers.get(containers.size() - 1).
+ getContainerId());
+ }
+ }
+
+ private FSAppAttempt createStarvingApp(FSSchedulerNode schedulerNode,
+ Resource request) {
+ FSAppAttempt starvingApp = mock(FSAppAttempt.class);
+ ApplicationAttemptId appAttemptId =
+ mock(ApplicationAttemptId.class);
+ when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId);
+ when(starvingApp.assignContainer(schedulerNode)).thenAnswer(
+ new Answer<Resource>() {
+ @Override
+ public Resource answer(InvocationOnMock invocationOnMock)
+ throws Throwable {
+ Resource response = Resource.newInstance(0, 0);
+ while (!Resources.isNone(request) &&
+ !Resources.isNone(schedulerNode.getUnallocatedResource())) {
+ RMContainer container = createContainer(request, appAttemptId);
+ schedulerNode.allocateContainer(container);
+ Resources.addTo(response, container.getAllocatedResource());
+ Resources.subtractFrom(request,
+ container.getAllocatedResource());
+ }
+ return response;
+ }
+ });
+ when(starvingApp.isStarved()).thenAnswer(
+ new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocationOnMock)
+ throws Throwable {
+ return !Resources.isNone(request);
+ }
+ }
+ );
+ when(starvingApp.getPendingDemand()).thenReturn(request);
+ return starvingApp;
+ }
+
+ private void finalValidation(FSSchedulerNode schedulerNode) {
+ assertEquals("Everything should have been released",
+ Resources.none(), schedulerNode.getAllocatedResource());
+ assertTrue("No containers should be reserved for preemption",
+ schedulerNode.containersForPreemption.isEmpty());
+ assertTrue("No resources should be reserved for preemptors",
+ schedulerNode.resourcesPreemptedForApp.isEmpty());
+ assertEquals(
+ "No amount of resource should be reserved for preemptees",
+ Resources.none(),
+ schedulerNode.getTotalReserved());
+ }
+
+ private void allocateContainers(FSSchedulerNode schedulerNode) {
+ FairScheduler.assignPreemptedContainers(schedulerNode);
+ }
+
+ /**
+ * Allocate and release a single container.
+ */
+ @Test
+ public void testSimpleAllocation() {
+ RMNode node = createNode();
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+ createDefaultContainer();
+ assertEquals("Nothing should have been allocated, yet",
+ Resources.none(), schedulerNode.getAllocatedResource());
+ schedulerNode.allocateContainer(containers.get(0));
+ assertEquals("Container should be allocated",
+ containers.get(0).getContainer().getResource(),
+ schedulerNode.getAllocatedResource());
+ schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+ assertEquals("Everything should have been released",
+ Resources.none(), schedulerNode.getAllocatedResource());
+
+ // Check that we are error prone
+ schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+ finalValidation(schedulerNode);
+ }
+
+ /**
+ * Allocate and release three containers with launch.
+ */
+ @Test
+ public void testMultipleAllocations() {
+ RMNode node = createNode();
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+ createDefaultContainer();
+ createDefaultContainer();
+ createDefaultContainer();
+ assertEquals("Nothing should have been allocated, yet",
+ Resources.none(), schedulerNode.getAllocatedResource());
+ schedulerNode.allocateContainer(containers.get(0));
+ schedulerNode.containerStarted(containers.get(0).getContainerId());
+ schedulerNode.allocateContainer(containers.get(1));
+ schedulerNode.containerStarted(containers.get(1).getContainerId());
+ schedulerNode.allocateContainer(containers.get(2));
+ assertEquals("Container should be allocated",
+ Resources.multiply(containers.get(0).getContainer().getResource(), 3.0),
+ schedulerNode.getAllocatedResource());
+ schedulerNode.releaseContainer(containers.get(1).getContainerId(), true);
+ schedulerNode.releaseContainer(containers.get(2).getContainerId(), true);
+ schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+ finalValidation(schedulerNode);
+ }
+
+ /**
+ * Allocate and release a single container.
+ */
+ @Test
+ public void testSimplePreemption() {
+ RMNode node = createNode();
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+ // Launch containers and saturate the cluster
+ saturateCluster(schedulerNode);
+ assertEquals("Container should be allocated",
+ Resources.multiply(containers.get(0).getContainer().getResource(),
+ containers.size()),
+ schedulerNode.getAllocatedResource());
+
+ // Request preemption
+ FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
+ Resource.newInstance(1024, 1));
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(0)), starvingApp);
+ assertEquals(
+ "No resource amount should be reserved for preemptees",
+ containers.get(0).getAllocatedResource(),
+ schedulerNode.getTotalReserved());
+
+ // Preemption occurs release one container
+ schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+ allocateContainers(schedulerNode);
+ assertEquals("Container should be allocated",
+ schedulerNode.getTotalResource(),
+ schedulerNode.getAllocatedResource());
+
+ // Release all remaining containers
+ for (int i = 1; i < containers.size(); ++i) {
+ schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+ }
+ finalValidation(schedulerNode);
+ }
+
+ /**
+ * Allocate and release three containers requested by two apps.
+ */
+ @Test
+ public void testComplexPreemption() {
+ RMNode node = createNode();
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+ // Launch containers and saturate the cluster
+ saturateCluster(schedulerNode);
+ assertEquals("Container should be allocated",
+ Resources.multiply(containers.get(0).getContainer().getResource(),
+ containers.size()),
+ schedulerNode.getAllocatedResource());
+
+ // Preempt a container
+ FSAppAttempt starvingApp1 = createStarvingApp(schedulerNode,
+ Resource.newInstance(2048, 2));
+ FSAppAttempt starvingApp2 = createStarvingApp(schedulerNode,
+ Resource.newInstance(1024, 1));
+
+ // Preemption thread kicks in
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(0)), starvingApp1);
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(1)), starvingApp1);
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(2)), starvingApp2);
+
+ // Preemption happens
+ schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+ schedulerNode.releaseContainer(containers.get(2).getContainerId(), true);
+ schedulerNode.releaseContainer(containers.get(1).getContainerId(), true);
+
+ allocateContainers(schedulerNode);
+ assertEquals("Container should be allocated",
+ schedulerNode.getTotalResource(),
+ schedulerNode.getAllocatedResource());
+
+ // Release all containers
+ for (int i = 3; i < containers.size(); ++i) {
+ schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+ }
+ finalValidation(schedulerNode);
+ }
+
+ /**
+ * Allocate and release three containers requested by two apps in two rounds.
+ */
+ @Test
+ public void testMultiplePreemptionEvents() {
+ RMNode node = createNode();
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+ // Launch containers and saturate the cluster
+ saturateCluster(schedulerNode);
+ assertEquals("Container should be allocated",
+ Resources.multiply(containers.get(0).getContainer().getResource(),
+ containers.size()),
+ schedulerNode.getAllocatedResource());
+
+ // Preempt a container
+ FSAppAttempt starvingApp1 = createStarvingApp(schedulerNode,
+ Resource.newInstance(2048, 2));
+ FSAppAttempt starvingApp2 = createStarvingApp(schedulerNode,
+ Resource.newInstance(1024, 1));
+
+ // Preemption thread kicks in
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(0)), starvingApp1);
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(1)), starvingApp1);
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(2)), starvingApp2);
+
+ // Preemption happens
+ schedulerNode.releaseContainer(containers.get(1).getContainerId(), true);
+ allocateContainers(schedulerNode);
+
+ schedulerNode.releaseContainer(containers.get(2).getContainerId(), true);
+ schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+ allocateContainers(schedulerNode);
+
+ assertEquals("Container should be allocated",
+ schedulerNode.getTotalResource(),
+ schedulerNode.getAllocatedResource());
+
+ // Release all containers
+ for (int i = 3; i < containers.size(); ++i) {
+ schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+ }
+ finalValidation(schedulerNode);
+ }
+
+ /**
+ * Allocate and release a single container and delete the app in between.
+ */
+ @Test
+ public void testPreemptionToCompletedApp() {
+ RMNode node = createNode();
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+ // Launch containers and saturate the cluster
+ saturateCluster(schedulerNode);
+ assertEquals("Container should be allocated",
+ Resources.multiply(containers.get(0).getContainer().getResource(),
+ containers.size()),
+ schedulerNode.getAllocatedResource());
+
+ // Preempt a container
+ FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
+ Resource.newInstance(1024, 1));
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(0)), starvingApp);
+
+ schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+
+ // Stop the application then try to satisfy the reservation
+ // and observe that there are still free resources not allocated to
+ // the deleted app
+ when(starvingApp.isStopped()).thenReturn(true);
+ allocateContainers(schedulerNode);
+ assertNotEquals("Container should be allocated",
+ schedulerNode.getTotalResource(),
+ schedulerNode.getAllocatedResource());
+
+ // Release all containers
+ for (int i = 1; i < containers.size(); ++i) {
+ schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+ }
+ finalValidation(schedulerNode);
+ }
+
+ /**
+ * Preempt a bigger container than the preemption request.
+ */
+ @Test
+ public void testPartialReservedPreemption() {
+ RMNode node = createNode();
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+ // Launch containers and saturate the cluster
+ saturateCluster(schedulerNode);
+ assertEquals("Container should be allocated",
+ Resources.multiply(containers.get(0).getContainer().getResource(),
+ containers.size()),
+ schedulerNode.getAllocatedResource());
+
+ // Preempt a container
+ Resource originalStarvingAppDemand = Resource.newInstance(512, 1);
+ FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
+ originalStarvingAppDemand);
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(0)), starvingApp);
+
+ // Preemption occurs
+ schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+
+ // Container partially reassigned
+ allocateContainers(schedulerNode);
+ assertEquals("Container should be allocated",
+ Resources.subtract(schedulerNode.getTotalResource(),
+ Resource.newInstance(512, 0)),
+ schedulerNode.getAllocatedResource());
+
+ // Cleanup simulating node update
+ schedulerNode.getPreemptionList();
+
+ // Release all containers
+ for (int i = 1; i < containers.size(); ++i) {
+ schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+ }
+ finalValidation(schedulerNode);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3375175/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 3940a47..59d243b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -294,11 +294,30 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
8 - 2 * numStarvedAppContainers,
greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());
+ // Verify the node is reserved for the starvingApp
+ for (RMNode rmNode : rmNodes) {
+ FSSchedulerNode node = (FSSchedulerNode)
+ scheduler.getNodeTracker().getNode(rmNode.getNodeID());
+ if (node.getContainersForPreemption().size() > 0) {
+ assertTrue("node should be reserved for the starvingApp",
+ node.getPreemptionList().keySet().contains(starvingApp));
+ }
+ }
+
sendEnoughNodeUpdatesToAssignFully();
// Verify the preempted containers are assigned to starvingApp
assertEquals("Starved app is not assigned the right # of containers",
numStarvedAppContainers, starvingApp.getLiveContainers().size());
+
+ // Verify the node is not reserved for the starvingApp anymore
+ for (RMNode rmNode : rmNodes) {
+ FSSchedulerNode node = (FSSchedulerNode)
+ scheduler.getNodeTracker().getNode(rmNode.getNodeID());
+ if (node.getContainersForPreemption().size() > 0) {
+ assertFalse(node.getPreemptionList().keySet().contains(starvingApp));
+ }
+ }
}
private void verifyNoPreemption() throws InterruptedException {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org