You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2019/04/06 12:17:25 UTC

[hadoop] branch trunk updated: YARN-9413. Queue resource leak after app fail for CapacityScheduler. Contributed by Tao Yang.

This is an automated email from the ASF dual-hosted git repository.

wwei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ec143cb  YARN-9413. Queue resource leak after app fail for CapacityScheduler. Contributed by Tao Yang.
ec143cb is described below

commit ec143cbf678bd65f87fdd464c23022a2d2c54c07
Author: Weiwei Yang <ww...@apache.org>
AuthorDate: Sat Apr 6 19:59:36 2019 +0800

    YARN-9413. Queue resource leak after app fail for CapacityScheduler. Contributed by Tao Yang.
---
 .../rmapp/attempt/RMAppAttemptImpl.java            |   3 +-
 .../applicationsmanager/TestAMRestart.java         | 145 ++++++++++++++++-----
 2 files changed, 116 insertions(+), 32 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 73c0b6c..bcc52e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -1502,7 +1502,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
               && !appAttempt.submissionContext.getUnmanagedAM()) {
             int numberOfFailure = ((RMAppImpl)appAttempt.rmApp)
                 .getNumFailedAppAttempts();
-            if (numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) {
+            if (appAttempt.rmApp.getMaxAppAttempts() > 1
+                && numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) {
               keepContainersAcrossAppAttempts = true;
             }
           }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 9f122cb..0083f40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -60,6 +60,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
@@ -381,8 +385,10 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
     rm1.stop();
   }
 
-  // AM container preempted, nm disk failure
-  // should not be counted towards AM max retry count.
+  /**
+   * AM container preempted, nm disk failure
+   * should not be counted towards AM max retry count.
+   */
   @Test(timeout = 100000)
   public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
     getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@@ -408,7 +414,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
     TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
         am1.getApplicationAttemptId());
 
-    Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry());
     rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
     ApplicationStateData appState =
         ((MemoryRMStateStore) rm1.getRMStateStore()).getState()
@@ -428,7 +434,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
     TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
         am2.getApplicationAttemptId());
 
-    Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
     rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
     MockAM am3 =
         rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
@@ -450,7 +456,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
     TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
         am3.getApplicationAttemptId());
 
-    Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry());
+    Assert.assertFalse(attempt3.shouldCountTowardsMaxAttemptRetry());
     Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
       appState.getAttempt(am3.getApplicationAttemptId())
         .getAMContainerExitStatus());
@@ -539,9 +545,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
     rm1.stop();
   }
 
-  // Test RM restarts after AM container is preempted, new RM should not count
-  // AM preemption failure towards the max-retry-account and should be able to
-  // re-launch the AM.
+  /**
+   * Test RM restarts after AM container is preempted, new RM should not count
+   * AM preemption failure towards the max-retry-account and should be able to
+   * re-launch the AM.
+   */
   @Test(timeout = 60000)
   public void testPreemptedAMRestartOnRMRestart() throws Exception {
     getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
@@ -624,9 +632,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
     rm2.stop();
   }
 
-  // Test regular RM restart/failover, new RM should not count
-  // AM failure towards the max-retry-account and should be able to
-  // re-launch the AM.
+  /**
+   * Test regular RM restart/failover, new RM should not count
+   * AM failure towards the max-retry-account and should be able to
+   * re-launch the AM.
+   */
   @Test(timeout = 50000)
   public void testRMRestartOrFailoverNotCountedForAMFailures()
       throws Exception {
@@ -944,9 +954,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
     rm1.stop();
   }
 
-  // Test restarting AM launched with the KeepContainers and AM reset window.
-  // after AM reset window, even if AM who was the last is failed,
-  // all containers are launched by previous AM should be kept.
+  /**
+   * Test restarting AM launched with the KeepContainers and AM reset window.
+   * after AM reset window, even if AM who was the last is failed,
+   * all containers are launched by previous AM should be kept.
+   */
   @Test (timeout = 20000)
   public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval()
       throws Exception {
@@ -1014,23 +1026,25 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
     rm1.stop();
   }
 
-  // Test to verify that the containers of previous attempt are returned in
-  // the RM response to the heartbeat of AM if these containers were not
-  // recovered by the time AM registered.
-  //
-  // 1. App is started with 2 containers running on 2 different nodes-
-  //    container 2 on the NM1 node and container 3 on the NM2 node.
-  // 2. Fail the AM of the application.
-  // 3. Simulate RM restart.
-  // 4. NM1 connects to the restarted RM immediately. It sends the RM the status
-  //    of container 2.
-  // 5. 2nd attempt of the app is launched and the app master registers with RM.
-  // 6. Verify that app master receives container 2 in the RM response to
-  //    register request.
-  // 7. NM2 connects to the RM after a delay. It sends the RM the status of
-  //    container 3.
-  // 8. Verify that the app master receives container 3 in the RM response to
-  //    its heartbeat.
+  /**
+   * Test to verify that the containers of previous attempt are returned in
+   * the RM response to the heartbeat of AM if these containers were not
+   * recovered by the time AM registered.
+   *
+   * 1. App is started with 2 containers running on 2 different nodes-
+   *    container 2 on the NM1 node and container 3 on the NM2 node.
+   * 2. Fail the AM of the application.
+   * 3. Simulate RM restart.
+   * 4. NM1 connects to the restarted RM immediately. It sends the RM the status
+   *    of container 2.
+   * 5. 2nd attempt of the app is launched and the app master registers with RM.
+   * 6. Verify that app master receives container 2 in the RM response to
+   *    register request.
+   * 7. NM2 connects to the RM after a delay. It sends the RM the status of
+   *    container 3.
+   * 8. Verify that the app master receives container 3 in the RM response to
+   *    its heartbeat.
+   */
   @Test(timeout = 200000)
   public void testContainersFromPreviousAttemptsWithRMRestart()
       throws Exception {
@@ -1167,4 +1181,73 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
     rm2.stop();
     rm1.stop();
   }
+
+  /**
+   * Test to verify that there is no queue resource leak after app fail.
+   *
+   * 1. Submit an app which is configured to keep containers across app
+   *    attempts and should fail after AM finished (am-max-attempts=1).
+   * 2. App is started with 2 containers running on NM1 node.
+   * 3. Preempt the AM of the application which should not count towards max
+   *    attempt retry but app will fail immediately.
+   * 4. Verify that the used resource of queue should be cleaned up normally
+   *    after app fail.
+   */
+  @Test(timeout = 30000)
+  public void testQueueResourceDoesNotLeak() throws Exception {
+    getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    getConf()
+        .set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    MockRM rm1 = new MockRM(getConf());
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200, 0, true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    allocateContainers(nm1, am1, 1);
+
+    // launch the 2nd container, for testing running container transferred.
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
+        ContainerState.RUNNING);
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm1.getResourceScheduler();
+    ContainerId amContainer =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    // Preempt AM container
+    scheduler.killContainer(scheduler.getRMContainer(amContainer));
+
+    rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+    TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
+        am1.getApplicationAttemptId());
+
+    Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry());
+
+    // AM should not be restarted.
+    rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
+
+    // After app1 failed, used resource of this queue should
+    // be cleaned up, otherwise resource leak happened.
+    if (getSchedulerType() == SchedulerType.CAPACITY) {
+      LeafQueue queue =
+          (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
+      Assert.assertEquals(0,
+          queue.getQueueResourceUsage().getUsed().getMemorySize());
+      Assert.assertEquals(0,
+          queue.getQueueResourceUsage().getUsed().getVirtualCores());
+    } else if (getSchedulerType() == SchedulerType.FAIR) {
+      FSLeafQueue queue = ((FairScheduler) scheduler).getQueueManager()
+          .getLeafQueue("root.default", false);
+      Assert.assertEquals(0, queue.getResourceUsage().getMemorySize());
+      Assert.assertEquals(0, queue.getResourceUsage().getVirtualCores());
+    }
+
+    rm1.stop();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org