You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ji...@apache.org on 2014/08/16 01:53:58 UTC
svn commit: r1618294 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-s...
Author: jianhe
Date: Fri Aug 15 23:53:57 2014
New Revision: 1618294
URL: http://svn.apache.org/r1618294
Log:
YARN-2389. Added functionality for schedulers to kill all applications in a queue. Contributed by Subramaniam Venkatraman Krishnan
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1618294&r1=1618293&r2=1618294&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Aug 15 23:53:57 2014
@@ -140,6 +140,9 @@ Release 2.6.0 - UNRELEASED
YARN-1370. Fair scheduler to re-populate container allocation state.
(Anubhav Dhoot via kasha)
+ YARN-2389. Added functionality for schedulers to kill all applications in a
+ queue. (Subramaniam Venkatraman Krishnan via jianhe)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1618294&r1=1618293&r2=1618294&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Fri Aug 15 23:53:57 2014
@@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -348,4 +350,23 @@ public abstract class AbstractYarnSchedu
.handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
}
}
+
+ @Override
+ public synchronized void killAllAppsInQueue(String queueName)
+ throws YarnException {
+ // check if queue is a valid
+ List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
+ if (apps == null) {
+ String errMsg = "The specified Queue: " + queueName + " doesn't exist";
+ LOG.warn(errMsg);
+ throw new YarnException(errMsg);
+ }
+ // generate kill events for each pending/running app
+ for (ApplicationAttemptId app : apps) {
+ this.rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1618294&r1=1618293&r2=1618294&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Fri Aug 15 23:53:57 2014
@@ -212,4 +212,12 @@ public interface YarnScheduler extends E
* @throws YarnException
*/
void moveAllApps(String sourceQueue, String destQueue) throws YarnException;
+
+ /**
+ * Terminate all applications in the specified queue.
+ *
+ * @param queueName the name of queue to be drained
+ * @throws YarnException
+ */
+ void killAllAppsInQueue(String queueName) throws YarnException;
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1618294&r1=1618293&r2=1618294&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Fri Aug 15 23:53:57 2014
@@ -1074,11 +1074,10 @@ public class TestCapacityScheduler {
queue =
scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
.getQueueName();
- System.out.println(queue);
Assert.assertTrue(queue.equals("b1"));
appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInA.contains(appAttemptId));
+ assertTrue(appsInB.contains(appAttemptId));
assertEquals(1, appsInB.size());
appsInRoot = scheduler.getAppsInQueue("root");
@@ -1140,6 +1139,7 @@ public class TestCapacityScheduler {
assertTrue(appsInA1.isEmpty());
appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(appAttemptId));
assertEquals(1, appsInA.size());
appsInRoot = scheduler.getAppsInQueue("root");
@@ -1664,7 +1664,7 @@ public class TestCapacityScheduler {
Assert.assertTrue(queue.equals("b1"));
appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInA.contains(appAttemptId));
+ assertTrue(appsInB.contains(appAttemptId));
assertEquals(1, appsInB.size());
appsInRoot = scheduler.getAppsInQueue("root");
@@ -1798,4 +1798,96 @@ public class TestCapacityScheduler {
rm.stop();
}
+ @Test
+ public void testKillAllAppsInQueue() throws Exception {
+ MockRM rm = setUpMove();
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm.getResourceScheduler();
+
+ // submit an app
+ RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+
+ // check preconditions
+ List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(1, appsInA1.size());
+
+ List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(appAttemptId));
+ assertEquals(1, appsInA.size());
+ String queue =
+ scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+ .getQueueName();
+ Assert.assertTrue(queue.equals("a1"));
+
+ List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(appAttemptId));
+ assertEquals(1, appsInRoot.size());
+
+ // now kill the app
+ scheduler.killAllAppsInQueue("a1");
+
+ // check postconditions
+ rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
+ appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.isEmpty());
+
+ appsInA1 = scheduler.getAppsInQueue("a1");
+ assertTrue(appsInA1.isEmpty());
+
+ appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.isEmpty());
+
+ rm.stop();
+ }
+
+ @Test
+ public void testKillAllAppsInvalidSource() throws Exception {
+ MockRM rm = setUpMove();
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm.getResourceScheduler();
+
+ // submit an app
+ RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+
+ // check preconditions
+ List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(1, appsInA1.size());
+
+ List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(appAttemptId));
+ assertEquals(1, appsInA.size());
+
+ List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(appAttemptId));
+ assertEquals(1, appsInRoot.size());
+
+ // now kill the app
+ try {
+ scheduler.killAllAppsInQueue("DOES_NOT_EXIST");
+ Assert.fail();
+ } catch (YarnException e) {
+ // expected
+ }
+
+ // check postconditions, app should still be in a1
+ appsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(1, appsInA1.size());
+
+ appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(appAttemptId));
+ assertEquals(1, appsInA.size());
+
+ appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(appAttemptId));
+ assertEquals(1, appsInRoot.size());
+
+ rm.stop();
+ }
+
}