You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yu...@apache.org on 2017/08/02 16:43:47 UTC
hadoop git commit: YARN-6895. [FairScheduler] Preemption reservation
may cause regular reservation leaks. (Miklos Szegedi via Yufei Gu)
Repository: hadoop
Updated Branches:
refs/heads/trunk 48899134d -> 45535f8af
YARN-6895. [FairScheduler] Preemption reservation may cause regular reservation leaks. (Miklos Szegedi via Yufei Gu)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/45535f8a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/45535f8a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/45535f8a
Branch: refs/heads/trunk
Commit: 45535f8afae4e5bf4f60597fc29ba94b4e7743f3
Parents: 4889913
Author: Yufei Gu <yu...@apache.org>
Authored: Wed Aug 2 09:25:19 2017 -0700
Committer: Yufei Gu <yu...@apache.org>
Committed: Wed Aug 2 09:25:19 2017 -0700
----------------------------------------------------------------------
.../scheduler/fair/FSAppAttempt.java | 17 ++++-
.../scheduler/fair/FSSchedulerNode.java | 68 ++++++++++++++------
.../scheduler/fair/FairScheduler.java | 9 +--
.../scheduler/fair/TestFSSchedulerNode.java | 52 +++++++++++++++
4 files changed, 119 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45535f8a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index a678bb9..5dfef73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -554,6 +554,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
this.minshareStarvation = Resources.none();
}
+ /**
+ * Get last computed minshare starvation.
+ *
+ * @return last computed minshare starvation
+ */
+ Resource getMinshareStarvation() {
+ return minshareStarvation;
+ }
+
void trackContainerForPreemption(RMContainer container) {
synchronized (preemptionVariablesLock) {
if (containersToPreempt.add(container)) {
@@ -842,7 +851,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
// The desired container won't fit here, so reserve
+ // Reserve only, if app does not wait for preempted resources on the node,
+ // otherwise we may end up with duplicate reservations
if (isReservable(capability) &&
+ !node.isPreemptedForApp(this) &&
reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
type, schedulerKey)) {
updateAMDiagnosticMsg(capability, " exceeds the available resources of "
@@ -1110,7 +1122,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
if (!starved ||
- now - lastTimeAtFairShare < getQueue().getFairSharePreemptionTimeout()) {
+ now - lastTimeAtFairShare <
+ getQueue().getFairSharePreemptionTimeout()) {
fairshareStarvation = Resources.none();
} else {
// The app has been starved for longer than preemption-timeout.
@@ -1138,7 +1151,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
/**
- * Is application starved for fairshare or minshare
+ * Is application starved for fairshare or minshare.
*/
boolean isStarved() {
return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45535f8a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 6575e0c..93646f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -35,12 +36,15 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
+/**
+ * Fair Scheduler specific node features.
+ */
@Private
@Unstable
public class FSSchedulerNode extends SchedulerNode {
@@ -122,7 +126,8 @@ public class FSSchedulerNode extends SchedulerNode {
SchedulerApplicationAttempt application) {
// Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication =
- getReservedContainer().getContainer().getId().getApplicationAttemptId();
+ getReservedContainer().getContainer().getId()
+ .getApplicationAttemptId();
if (!reservedApplication.equals(
application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " +
@@ -152,19 +157,36 @@ public class FSSchedulerNode extends SchedulerNode {
}
/**
+ * Returns whether a preemption is tracked on the node for the specified app.
+ * @return if preempted containers are reserved for the app
+ */
+ synchronized boolean isPreemptedForApp(FSAppAttempt app){
+ return resourcesPreemptedForApp.containsKey(app);
+ }
+
+ /**
* Remove apps that have their preemption requests fulfilled.
*/
- private synchronized void cleanupPreemptionList() {
- Iterator<Map.Entry<FSAppAttempt, Resource>> iterator =
- resourcesPreemptedForApp.entrySet().iterator();
- while(iterator.hasNext()) {
- FSAppAttempt app = iterator.next().getKey();
- if (app.isStopped() || !app.isStarved()) {
+ private void cleanupPreemptionList() {
+ // Synchronize separately to avoid potential deadlocks
+ // This may cause delayed deletion of reservations
+ LinkedList<FSAppAttempt> candidates;
+ synchronized (this) {
+ candidates = Lists.newLinkedList(resourcesPreemptedForApp.keySet());
+ }
+ for (FSAppAttempt app : candidates) {
+ if (app.isStopped() || !app.isStarved() ||
+ (Resources.isNone(app.getFairshareStarvation()) &&
+ Resources.isNone(app.getMinshareStarvation()))) {
// App does not need more resources
- Resources.subtractFrom(totalResourcesPreempted,
- resourcesPreemptedForApp.get(app));
- appIdToAppMap.remove(app.getApplicationAttemptId());
- iterator.remove();
+ synchronized (this) {
+ Resource removed = resourcesPreemptedForApp.remove(app);
+ if (removed != null) {
+ Resources.subtractFrom(totalResourcesPreempted,
+ removed);
+ appIdToAppMap.remove(app.getApplicationAttemptId());
+ }
+ }
}
}
}
@@ -180,15 +202,23 @@ public class FSSchedulerNode extends SchedulerNode {
void addContainersForPreemption(Collection<RMContainer> containers,
FSAppAttempt app) {
- appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app);
- resourcesPreemptedForApp.putIfAbsent(app, Resource.newInstance(0, 0));
- Resource appReserved = resourcesPreemptedForApp.get(app);
+ Resource appReserved = Resources.createResource(0);
for(RMContainer container : containers) {
- containersForPreemption.add(container);
- Resources.addTo(appReserved, container.getAllocatedResource());
- Resources.addTo(totalResourcesPreempted,
- container.getAllocatedResource());
+ if(containersForPreemption.add(container)) {
+ Resources.addTo(appReserved, container.getAllocatedResource());
+ }
+ }
+
+ synchronized (this) {
+ if (!Resources.isNone(appReserved)) {
+ Resources.addTo(totalResourcesPreempted,
+ appReserved);
+ appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app);
+ resourcesPreemptedForApp.
+ putIfAbsent(app, Resource.newInstance(0, 0));
+ Resources.addTo(resourcesPreemptedForApp.get(app), appReserved);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45535f8a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index b41d3f7..db02bab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -985,25 +985,22 @@ public class FairScheduler extends
* Assign preempted containers to the applications that have reserved
* resources for preempted containers.
* @param node Node to check
- * @return assignment has occurred
*/
- static boolean assignPreemptedContainers(FSSchedulerNode node) {
- boolean assignedAny = false;
+ static void assignPreemptedContainers(FSSchedulerNode node) {
for (Entry<FSAppAttempt, Resource> entry :
node.getPreemptionList().entrySet()) {
FSAppAttempt app = entry.getKey();
Resource preemptionPending = Resources.clone(entry.getValue());
while (!app.isStopped() && !Resources.isNone(preemptionPending)) {
Resource assigned = app.assignContainer(node);
- if (Resources.isNone(assigned)) {
+ if (Resources.isNone(assigned) ||
+ assigned.equals(FairScheduler.CONTAINER_RESERVED)) {
// Fail to assign, let's not try further
break;
}
- assignedAny = true;
Resources.subtractFromNonNegative(preemptionPending, assigned);
}
}
- return assignedAny;
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45535f8a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
index 3927b00..0e3d344 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
@@ -31,6 +31,7 @@ import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -68,6 +69,16 @@ public class TestFSSchedulerNode {
when(container.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
when(container.getAllocatedResource()).
thenReturn(Resources.clone(request));
+ when(container.compareTo(any())).thenAnswer(new Answer<Integer>() {
+ public Integer answer(InvocationOnMock invocation) {
+ return
+ Long.compare(
+ ((RMContainer)invocation.getMock()).getContainerId()
+ .getContainerId(),
+ ((RMContainer)invocation.getArguments()[0]).getContainerId()
+ .getContainerId());
+ }
+ });
containers.add(container);
return container;
}
@@ -225,6 +236,47 @@ public class TestFSSchedulerNode {
}
/**
+ * Allocate a single container twice and release.
+ */
+ @Test
+ public void testDuplicatePreemption() {
+ RMNode node = createNode();
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+ // Launch containers and saturate the cluster
+ saturateCluster(schedulerNode);
+ assertEquals("Container should be allocated",
+ Resources.multiply(containers.get(0).getContainer().getResource(),
+ containers.size()),
+ schedulerNode.getAllocatedResource());
+
+ // Request preemption twice
+ FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
+ Resource.newInstance(1024, 1));
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(0)), starvingApp);
+ schedulerNode.addContainersForPreemption(
+ Collections.singletonList(containers.get(0)), starvingApp);
+ assertEquals(
+ "No resource amount should be reserved for preemptees",
+ containers.get(0).getAllocatedResource(),
+ schedulerNode.getTotalReserved());
+
+ // Preemption occurs release one container
+ schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+ allocateContainers(schedulerNode);
+ assertEquals("Container should be allocated",
+ schedulerNode.getTotalResource(),
+ schedulerNode.getAllocatedResource());
+
+ // Release all remaining containers
+ for (int i = 1; i < containers.size(); ++i) {
+ schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+ }
+ finalValidation(schedulerNode);
+ }
+
+ /**
* Allocate and release three containers requested by two apps.
*/
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org