You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yu...@apache.org on 2017/08/02 16:43:47 UTC

hadoop git commit: YARN-6895. [FairScheduler] Preemption reservation may cause regular reservation leaks. (Miklos Szegedi via Yufei Gu)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 48899134d -> 45535f8af


YARN-6895. [FairScheduler] Preemption reservation may cause regular reservation leaks. (Miklos Szegedi via Yufei Gu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/45535f8a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/45535f8a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/45535f8a

Branch: refs/heads/trunk
Commit: 45535f8afae4e5bf4f60597fc29ba94b4e7743f3
Parents: 4889913
Author: Yufei Gu <yu...@apache.org>
Authored: Wed Aug 2 09:25:19 2017 -0700
Committer: Yufei Gu <yu...@apache.org>
Committed: Wed Aug 2 09:25:19 2017 -0700

----------------------------------------------------------------------
 .../scheduler/fair/FSAppAttempt.java            | 17 ++++-
 .../scheduler/fair/FSSchedulerNode.java         | 68 ++++++++++++++------
 .../scheduler/fair/FairScheduler.java           |  9 +--
 .../scheduler/fair/TestFSSchedulerNode.java     | 52 +++++++++++++++
 4 files changed, 119 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/45535f8a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index a678bb9..5dfef73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -554,6 +554,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     this.minshareStarvation = Resources.none();
   }
 
+  /**
+   * Get last computed minshare starvation.
+   *
+   * @return last computed minshare starvation
+   */
+  Resource getMinshareStarvation() {
+    return minshareStarvation;
+  }
+
   void trackContainerForPreemption(RMContainer container) {
     synchronized (preemptionVariablesLock) {
       if (containersToPreempt.add(container)) {
@@ -842,7 +851,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     }
 
     // The desired container won't fit here, so reserve
+    // Reserve only, if app does not wait for preempted resources on the node,
+    // otherwise we may end up with duplicate reservations
     if (isReservable(capability) &&
+        !node.isPreemptedForApp(this) &&
         reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
             type, schedulerKey)) {
       updateAMDiagnosticMsg(capability, " exceeds the available resources of "
@@ -1110,7 +1122,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     }
 
     if (!starved ||
-        now - lastTimeAtFairShare < getQueue().getFairSharePreemptionTimeout()) {
+        now - lastTimeAtFairShare <
+            getQueue().getFairSharePreemptionTimeout()) {
       fairshareStarvation = Resources.none();
     } else {
       // The app has been starved for longer than preemption-timeout.
@@ -1138,7 +1151,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   /**
-   * Is application starved for fairshare or minshare
+   * Is application starved for fairshare or minshare.
    */
   boolean isStarved() {
     return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45535f8a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 6575e0c..93646f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -35,12 +36,15 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 
+/**
+ * Fair Scheduler specific node features.
+ */
 @Private
 @Unstable
 public class FSSchedulerNode extends SchedulerNode {
@@ -122,7 +126,8 @@ public class FSSchedulerNode extends SchedulerNode {
       SchedulerApplicationAttempt application) {
     // Cannot unreserve for wrong application...
     ApplicationAttemptId reservedApplication = 
-        getReservedContainer().getContainer().getId().getApplicationAttemptId(); 
+        getReservedContainer().getContainer().getId()
+            .getApplicationAttemptId();
     if (!reservedApplication.equals(
         application.getApplicationAttemptId())) {
       throw new IllegalStateException("Trying to unreserve " +  
@@ -152,19 +157,36 @@ public class FSSchedulerNode extends SchedulerNode {
   }
 
   /**
+   * Returns whether a preemption is tracked on the node for the specified app.
+   * @return if preempted containers are reserved for the app
+   */
+  synchronized boolean isPreemptedForApp(FSAppAttempt app){
+    return resourcesPreemptedForApp.containsKey(app);
+  }
+
+  /**
    * Remove apps that have their preemption requests fulfilled.
    */
-  private synchronized void cleanupPreemptionList() {
-    Iterator<Map.Entry<FSAppAttempt, Resource>> iterator =
-        resourcesPreemptedForApp.entrySet().iterator();
-    while(iterator.hasNext()) {
-      FSAppAttempt app = iterator.next().getKey();
-      if (app.isStopped() || !app.isStarved()) {
+  private void cleanupPreemptionList() {
+    // Synchronize separately to avoid potential deadlocks
+    // This may cause delayed deletion of reservations
+    LinkedList<FSAppAttempt> candidates;
+    synchronized (this) {
+      candidates = Lists.newLinkedList(resourcesPreemptedForApp.keySet());
+    }
+    for (FSAppAttempt app : candidates) {
+      if (app.isStopped() || !app.isStarved() ||
+          (Resources.isNone(app.getFairshareStarvation()) &&
+           Resources.isNone(app.getMinshareStarvation()))) {
         // App does not need more resources
-        Resources.subtractFrom(totalResourcesPreempted,
-            resourcesPreemptedForApp.get(app));
-        appIdToAppMap.remove(app.getApplicationAttemptId());
-        iterator.remove();
+        synchronized (this) {
+          Resource removed = resourcesPreemptedForApp.remove(app);
+          if (removed != null) {
+            Resources.subtractFrom(totalResourcesPreempted,
+                removed);
+            appIdToAppMap.remove(app.getApplicationAttemptId());
+          }
+        }
       }
     }
   }
@@ -180,15 +202,23 @@ public class FSSchedulerNode extends SchedulerNode {
   void addContainersForPreemption(Collection<RMContainer> containers,
                                   FSAppAttempt app) {
 
-    appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app);
-    resourcesPreemptedForApp.putIfAbsent(app, Resource.newInstance(0, 0));
-    Resource appReserved = resourcesPreemptedForApp.get(app);
+    Resource appReserved = Resources.createResource(0);
 
     for(RMContainer container : containers) {
-      containersForPreemption.add(container);
-      Resources.addTo(appReserved, container.getAllocatedResource());
-      Resources.addTo(totalResourcesPreempted,
-          container.getAllocatedResource());
+      if(containersForPreemption.add(container)) {
+        Resources.addTo(appReserved, container.getAllocatedResource());
+      }
+    }
+
+    synchronized (this) {
+      if (!Resources.isNone(appReserved)) {
+        Resources.addTo(totalResourcesPreempted,
+            appReserved);
+        appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app);
+        resourcesPreemptedForApp.
+            putIfAbsent(app, Resource.newInstance(0, 0));
+        Resources.addTo(resourcesPreemptedForApp.get(app), appReserved);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45535f8a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index b41d3f7..db02bab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -985,25 +985,22 @@ public class FairScheduler extends
    * Assign preempted containers to the applications that have reserved
    * resources for preempted containers.
    * @param node Node to check
-   * @return assignment has occurred
    */
-  static boolean assignPreemptedContainers(FSSchedulerNode node) {
-    boolean assignedAny = false;
+  static void assignPreemptedContainers(FSSchedulerNode node) {
     for (Entry<FSAppAttempt, Resource> entry :
         node.getPreemptionList().entrySet()) {
       FSAppAttempt app = entry.getKey();
       Resource preemptionPending = Resources.clone(entry.getValue());
       while (!app.isStopped() && !Resources.isNone(preemptionPending)) {
         Resource assigned = app.assignContainer(node);
-        if (Resources.isNone(assigned)) {
+        if (Resources.isNone(assigned) ||
+            assigned.equals(FairScheduler.CONTAINER_RESERVED)) {
           // Fail to assign, let's not try further
           break;
         }
-        assignedAny = true;
         Resources.subtractFromNonNegative(preemptionPending, assigned);
       }
     }
-    return assignedAny;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45535f8a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
index 3927b00..0e3d344 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
@@ -31,6 +31,7 @@ import java.util.Collections;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -68,6 +69,16 @@ public class TestFSSchedulerNode {
     when(container.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
     when(container.getAllocatedResource()).
         thenReturn(Resources.clone(request));
+    when(container.compareTo(any())).thenAnswer(new Answer<Integer>() {
+      public Integer answer(InvocationOnMock invocation) {
+        return
+            Long.compare(
+            ((RMContainer)invocation.getMock()).getContainerId()
+                .getContainerId(),
+            ((RMContainer)invocation.getArguments()[0]).getContainerId()
+                .getContainerId());
+      }
+    });
     containers.add(container);
     return container;
   }
@@ -225,6 +236,47 @@ public class TestFSSchedulerNode {
   }
 
   /**
+   * Allocate a single container twice and release.
+   */
+  @Test
+  public void testDuplicatePreemption() {
+    RMNode node = createNode();
+    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
+
+    // Launch containers and saturate the cluster
+    saturateCluster(schedulerNode);
+    assertEquals("Container should be allocated",
+        Resources.multiply(containers.get(0).getContainer().getResource(),
+            containers.size()),
+        schedulerNode.getAllocatedResource());
+
+    // Request preemption twice
+    FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
+        Resource.newInstance(1024, 1));
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(0)), starvingApp);
+    schedulerNode.addContainersForPreemption(
+        Collections.singletonList(containers.get(0)), starvingApp);
+    assertEquals(
+        "No resource amount should be reserved for preemptees",
+        containers.get(0).getAllocatedResource(),
+        schedulerNode.getTotalReserved());
+
+    // Preemption occurs release one container
+    schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
+    allocateContainers(schedulerNode);
+    assertEquals("Container should be allocated",
+        schedulerNode.getTotalResource(),
+        schedulerNode.getAllocatedResource());
+
+    // Release all remaining containers
+    for (int i = 1; i < containers.size(); ++i) {
+      schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
+    }
+    finalValidation(schedulerNode);
+  }
+
+  /**
    * Allocate and release three containers requested by two apps.
    */
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org