You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ji...@apache.org on 2014/08/16 02:02:18 UTC

svn commit: r1618295 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ hadoop-yarn/hadoop-yarn-server/h...

Author: jianhe
Date: Sat Aug 16 00:02:17 2014
New Revision: 1618295

URL: http://svn.apache.org/r1618295
Log:
Merge r1618294 from trunk. YARN-2389. Added functionality for schedulers to kill all applications in a queue. Contributed by Subramaniam Venkatraman Krishnan

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1618295&r1=1618294&r2=1618295&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sat Aug 16 00:02:17 2014
@@ -122,6 +122,9 @@ Release 2.6.0 - UNRELEASED
     YARN-1918. Typo in description and error message for 
       'yarn.resourcemanager.cluster-id' (Anandha L Ranganathan via aw)
 
+    YARN-2389. Added functionality for schedulers to kill all applications in a
+    queue. (Subramaniam Venkatraman Krishnan via jianhe)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1618295&r1=1618294&r2=1618295&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Sat Aug 16 00:02:17 2014
@@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -348,4 +350,23 @@ public abstract class AbstractYarnSchedu
           .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
     }
   }
+
+  @Override
+  public synchronized void killAllAppsInQueue(String queueName)
+      throws YarnException {
+    // check if queue is a valid
+    List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
+    if (apps == null) {
+      String errMsg = "The specified Queue: " + queueName + " doesn't exist";
+      LOG.warn(errMsg);
+      throw new YarnException(errMsg);
+    }
+    // generate kill events for each pending/running app
+    for (ApplicationAttemptId app : apps) {
+      this.rmContext
+          .getDispatcher()
+          .getEventHandler()
+          .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1618295&r1=1618294&r2=1618295&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Sat Aug 16 00:02:17 2014
@@ -212,4 +212,12 @@ public interface YarnScheduler extends E
    * @throws YarnException
    */
   void moveAllApps(String sourceQueue, String destQueue) throws YarnException;
+
+  /**
+   * Terminate all applications in the specified queue.
+   *
+   * @param queueName the name of queue to be drained
+   * @throws YarnException
+   */
+  void killAllAppsInQueue(String queueName) throws YarnException;
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1618295&r1=1618294&r2=1618295&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Sat Aug 16 00:02:17 2014
@@ -1074,11 +1074,10 @@ public class TestCapacityScheduler {
     queue =
         scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
             .getQueueName();
-    System.out.println(queue);
     Assert.assertTrue(queue.equals("b1"));
 
     appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInA.contains(appAttemptId));
+    assertTrue(appsInB.contains(appAttemptId));
     assertEquals(1, appsInB.size());
 
     appsInRoot = scheduler.getAppsInQueue("root");
@@ -1140,6 +1139,7 @@ public class TestCapacityScheduler {
     assertTrue(appsInA1.isEmpty());
 
     appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
     assertEquals(1, appsInA.size());
 
     appsInRoot = scheduler.getAppsInQueue("root");
@@ -1664,7 +1664,7 @@ public class TestCapacityScheduler {
     Assert.assertTrue(queue.equals("b1"));
 
     appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInA.contains(appAttemptId));
+    assertTrue(appsInB.contains(appAttemptId));
     assertEquals(1, appsInB.size());
 
     appsInRoot = scheduler.getAppsInQueue("root");
@@ -1798,4 +1798,96 @@ public class TestCapacityScheduler {
     rm.stop();
   }
 
+  @Test
+  public void testKillAllAppsInQueue() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("a1"));
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    // now kill the app
+    scheduler.killAllAppsInQueue("a1");
+
+    // check postconditions
+    rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.isEmpty());
+
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertTrue(appsInA1.isEmpty());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testKillAllAppsInvalidSource() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    // now kill the app
+    try {
+      scheduler.killAllAppsInQueue("DOES_NOT_EXIST");
+      Assert.fail();
+    } catch (YarnException e) {
+      // expected
+    }
+
+    // check postconditions, app should still be in a1
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    rm.stop();
+  }
+
 }