You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/01/02 21:19:46 UTC
svn commit: r1554896 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn...
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Thu Jan 2 20:19:45 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -51,7 +52,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -442,7 +442,7 @@ public class ParentQueue implements CSQu
}
@Override
- public void submitApplication(FiCaSchedulerApp application, String user,
+ public void submitApplication(ApplicationId applicationId, String user,
String queue) throws AccessControlException {
synchronized (this) {
@@ -455,57 +455,70 @@ public class ParentQueue implements CSQu
if (state != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath() +
" is STOPPED. Cannot accept submission of application: " +
- application.getApplicationId());
+ applicationId);
}
- addApplication(application, user);
+ addApplication(applicationId, user);
}
// Inform the parent queue
if (parent != null) {
try {
- parent.submitApplication(application, user, queue);
+ parent.submitApplication(applicationId, user, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
- removeApplication(application, user);
+ removeApplication(applicationId, user);
throw ace;
}
}
}
- private synchronized void addApplication(FiCaSchedulerApp application,
+
+ @Override
+ public void submitApplicationAttempt(FiCaSchedulerApp application,
+ String userName) {
+ // submit attempt logic.
+ }
+
+ @Override
+ public void finishApplicationAttempt(FiCaSchedulerApp application,
+ String queue) {
+ // finish attempt logic.
+ }
+
+ private synchronized void addApplication(ApplicationId applicationId,
String user) {
-
+
++numApplications;
LOG.info("Application added -" +
- " appId: " + application.getApplicationId() +
+ " appId: " + applicationId +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
}
@Override
- public void finishApplication(FiCaSchedulerApp application, String queue) {
+ public void finishApplication(ApplicationId application, String user) {
synchronized (this) {
- removeApplication(application, application.getUser());
+ removeApplication(application, user);
}
// Inform the parent queue
if (parent != null) {
- parent.finishApplication(application, queue);
+ parent.finishApplication(application, user);
}
}
- public synchronized void removeApplication(FiCaSchedulerApp application,
+ public synchronized void removeApplication(ApplicationId applicationId,
String user) {
--numApplications;
LOG.info("Application removed -" +
- " appId: " + application.getApplicationId() +
+ " appId: " + applicationId +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Thu Jan 2 20:19:45 2014
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.util.resou
*/
@Private
@Unstable
-public class FiCaSchedulerApp extends SchedulerApplication {
+public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Thu Jan 2 20:19:45 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -206,7 +206,7 @@ public class FiCaSchedulerNode extends S
}
public synchronized void reserveResource(
- SchedulerApplication application, Priority priority,
+ SchedulerApplicationAttempt application, Priority priority,
RMContainer reservedContainer) {
// Check if it's already reserved
if (this.reservedContainer != null) {
@@ -241,7 +241,7 @@ public class FiCaSchedulerNode extends S
}
public synchronized void unreserveResource(
- SchedulerApplication application) {
+ SchedulerApplicationAttempt application) {
// adding NP checks as this can now be called for preemption
if (reservedContainer != null
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1554896&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Thu Jan 2 20:19:45 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class AppAddedSchedulerEvent extends SchedulerEvent {
+
+ private final ApplicationId applicationId;
+ private final String queue;
+ private final String user;
+
+ public AppAddedSchedulerEvent(
+ ApplicationId applicationId, String queue, String user) {
+ super(SchedulerEventType.APP_ADDED);
+ this.applicationId = applicationId;
+ this.queue = queue;
+ this.user = user;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Thu Jan 2 20:19:45 2014
@@ -23,27 +23,14 @@ import org.apache.hadoop.yarn.api.record
public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
private final ApplicationAttemptId applicationAttemptId;
- private final String queue;
- private final String user;
public AppAttemptAddedSchedulerEvent(
- ApplicationAttemptId applicationAttemptId, String queue, String user) {
+ ApplicationAttemptId applicationAttemptId) {
super(SchedulerEventType.APP_ATTEMPT_ADDED);
this.applicationAttemptId = applicationAttemptId;
- this.queue = queue;
- this.user = user;
}
public ApplicationAttemptId getApplicationAttemptId() {
return applicationAttemptId;
}
-
- public String getQueue() {
- return queue;
- }
-
- public String getUser() {
- return user;
- }
-
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java?rev=1554896&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java Thu Jan 2 20:19:45 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+
+public class AppRemovedSchedulerEvent extends SchedulerEvent {
+
+ private final ApplicationId applicationId;
+ private final RMAppState finalState;
+
+ public AppRemovedSchedulerEvent(ApplicationId applicationId,
+ RMAppState finalState) {
+ super(SchedulerEventType.APP_REMOVED);
+ this.applicationId = applicationId;
+ this.finalState = finalState;
+ }
+
+ public ApplicationId getApplicationID() {
+ return this.applicationId;
+ }
+
+ public RMAppState getFinalState() {
+ return this.finalState;
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java Thu Jan 2 20:19:45 2014
@@ -24,7 +24,11 @@ public enum SchedulerEventType {
NODE_ADDED,
NODE_REMOVED,
NODE_UPDATE,
-
+
+ // Source: RMApp
+ APP_ADDED,
+ APP_REMOVED,
+
// Source: RMAppAttempt
APP_ATTEMPT_ADDED,
APP_ATTEMPT_REMOVED,
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Thu Jan 2 20:19:45 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@Private
@Unstable
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu Jan 2 20:19:45 2014
@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.util.resou
*/
@Private
@Unstable
-public class FSSchedulerApp extends SchedulerApplication {
+public class FSSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Jan 2 20:19:45 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,10 +59,13 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -75,8 +79,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -151,10 +157,15 @@ public class FairScheduler implements Re
// Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
- // This stores per-application scheduling information, indexed by
+ // This stores per-application scheduling information,
+ @VisibleForTesting
+ protected Map<ApplicationId, SchedulerApplication> applications =
+ new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+
+ // This stores per-application-attempt scheduling information, indexed by
// attempt ID's for fast lookup.
@VisibleForTesting
- protected Map<ApplicationAttemptId, FSSchedulerApp> applications =
+ protected Map<ApplicationAttemptId, FSSchedulerApp> appAttempts =
new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId
@@ -253,7 +264,7 @@ public class FairScheduler implements Re
private RMContainer getRMContainer(ContainerId containerId) {
FSSchedulerApp application =
- applications.get(containerId.getApplicationAttemptId());
+ appAttempts.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
@@ -591,44 +602,63 @@ public class FairScheduler implements Re
* user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
- protected synchronized void addApplicationAttempt(
- ApplicationAttemptId applicationAttemptId, String queueName, String user) {
+ protected synchronized void addApplication(ApplicationId applicationId,
+ String queueName, String user) {
if (queueName == null || queueName.isEmpty()) {
- String message = "Reject application " + applicationAttemptId +
+ String message = "Reject application " + applicationId +
" submitted by user " + user + " with an empty queue name.";
LOG.info(message);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, message));
return;
}
- RMApp rmApp = rmContext.getRMApps().get(
- applicationAttemptId.getApplicationId());
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
if (queue == null) {
rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId,
+ new RMAppRejectedEvent(applicationId,
"Application rejected by queue placement policy"));
return;
}
- FSSchedulerApp schedulerApp =
- new FSSchedulerApp(applicationAttemptId, user,
- queue, new ActiveUsersManager(getRootQueueMetrics()),
- rmContext);
-
// Enforce ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
&& !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
String msg = "User " + userUgi.getUserName() +
- " cannot submit applications to queue " + queue.getName();
+ " cannot submit applications to queue " + queue.getName();
LOG.info(msg);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId, msg));
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, msg));
return;
}
+
+ SchedulerApplication application =
+ new SchedulerApplication(queue, user);
+ applications.put(applicationId, application);
+
+ LOG.info("Accepted application " + applicationId + " from user: " + user
+ + ", in queue: " + queueName);
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
+
+ /**
+ * Add a new application attempt to the scheduler.
+ */
+ protected synchronized void addApplicationAttempt(
+ ApplicationAttemptId applicationAttemptId) {
+ SchedulerApplication application =
+ applications.get(applicationAttemptId.getApplicationId());
+ String user = application.getUser();
+ FSLeafQueue queue = (FSLeafQueue) application.getQueue();
+
+ FSSchedulerApp schedulerApp =
+ new FSSchedulerApp(applicationAttemptId, user,
+ queue, new ActiveUsersManager(getRootQueueMetrics()),
+ rmContext);
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
queue.addApp(schedulerApp, runnable);
@@ -639,16 +669,14 @@ public class FairScheduler implements Re
}
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
+ appAttempts.put(applicationAttemptId, schedulerApp);
- applications.put(applicationAttemptId, schedulerApp);
-
- LOG.info("Application Submission: " + applicationAttemptId +
- ", user: "+ user +
- ", currently active: " + applications.size());
-
+ LOG.info("Added Application Attempt " + applicationAttemptId
+ + " to scheduler from user: " + user + ", currently active: "
+ + appAttempts.size());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.APP_ACCEPTED));
+ RMAppAttemptEventType.ATTEMPT_ADDED));
}
@VisibleForTesting
@@ -674,13 +702,18 @@ public class FairScheduler implements Re
return queue;
}
+ private synchronized void removeApplication(ApplicationId applicationId,
+ RMAppState finalState) {
+ applications.remove(applicationId);
+ }
+
private synchronized void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
- FSSchedulerApp application = applications.get(applicationAttemptId);
+ FSSchedulerApp application = appAttempts.get(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
@@ -720,7 +753,7 @@ public class FairScheduler implements Re
}
// Remove from our data-structure
- applications.remove(applicationAttemptId);
+ appAttempts.remove(applicationAttemptId);
}
/**
@@ -737,7 +770,7 @@ public class FairScheduler implements Re
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
- FSSchedulerApp application = applications.get(applicationAttemptId);
+ FSSchedulerApp application = appAttempts.get(applicationAttemptId);
if (application == null) {
LOG.info("Container " + container + " of" +
" unknown application " + applicationAttemptId +
@@ -811,7 +844,7 @@ public class FairScheduler implements Re
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Make sure this application exists
- FSSchedulerApp application = applications.get(appAttemptId);
+ FSSchedulerApp application = appAttempts.get(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
@@ -882,7 +915,7 @@ public class FairScheduler implements Re
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
- FSSchedulerApp application = applications.get(applicationAttemptId);
+ FSSchedulerApp application = appAttempts.get(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
@@ -1025,23 +1058,23 @@ public class FairScheduler implements Re
}
public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
- return applications.get(appAttemptId);
+ return appAttempts.get(appAttemptId);
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
- if (!applications.containsKey(appAttemptId)) {
+ if (!appAttempts.containsKey(appAttemptId)) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
- return new SchedulerAppReport(applications.get(appAttemptId));
+ return new SchedulerAppReport(appAttempts.get(appAttemptId));
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
- FSSchedulerApp app = applications.get(appAttemptId);
+ FSSchedulerApp app = appAttempts.get(appAttemptId);
if (app == null) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
@@ -1090,15 +1123,29 @@ public class FairScheduler implements Re
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode());
break;
+ case APP_ADDED:
+ if (!(event instanceof AppAddedSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+ addApplication(appAddedEvent.getApplicationId(),
+ appAddedEvent.getQueue(), appAddedEvent.getUser());
+ break;
+ case APP_REMOVED:
+ if (!(event instanceof AppRemovedSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
+ removeApplication(appRemovedEvent.getApplicationID(),
+ appRemovedEvent.getFinalState());
+ break;
case APP_ATTEMPT_ADDED:
if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
- String queue = appAttemptAddedEvent.getQueue();
- addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- queue, appAttemptAddedEvent.getUser());
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
break;
case APP_ATTEMPT_REMOVED:
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Thu Jan 2 20:19:45 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,6 +59,9 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -74,12 +78,15 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -116,11 +123,15 @@ public class FifoScheduler implements Re
private Resource maximumAllocation;
private boolean usePortForNodeName;
+ @VisibleForTesting
+ protected Map<ApplicationId, SchedulerApplication> applications =
+ new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
+
// Use ConcurrentSkipListMap because applications need to be ordered
@VisibleForTesting
- protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications
+ protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts
= new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
-
+
private ActiveUsersManager activeUsersManager;
private static final String DEFAULT_QUEUE_NAME = "default";
@@ -327,7 +338,7 @@ public class FifoScheduler implements Re
@VisibleForTesting
FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
- return applications.get(applicationAttemptId);
+ return appAttempts.get(applicationAttemptId);
}
@Override
@@ -347,20 +358,44 @@ public class FifoScheduler implements Re
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
-
- private synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId,
- String user) {
+
+ private synchronized void addApplication(ApplicationId applicationId,
+ String queue, String user) {
+ SchedulerApplication application =
+ new SchedulerApplication(null, user);
+ applications.put(applicationId, application);
+ LOG.info("Accepted application " + applicationId + " from user: " + user);
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
+
+ private synchronized void addApplicationAttempt(
+ ApplicationAttemptId appAttemptId) {
+ SchedulerApplication application =
+ applications.get(appAttemptId.getApplicationId());
+ String user = application.getUser();
// TODO: Fix store
- FiCaSchedulerApp schedulerApp =
- new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
- this.rmContext);
- applications.put(appAttemptId, schedulerApp);
+ FiCaSchedulerApp schedulerApp =
+ new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
+ activeUsersManager, this.rmContext);
+ appAttempts.put(appAttemptId, schedulerApp);
metrics.submitApp(user, appAttemptId.getAttemptId());
- LOG.info("Application Submission: " + appAttemptId.getApplicationId() +
- " from " + user + ", currently active: " + applications.size());
+ LOG.info("Added Application Attempt " + appAttemptId
+ + " to scheduler from user " + application.getUser()
+ + ", currently active: " + appAttempts.size());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
- RMAppAttemptEventType.APP_ACCEPTED));
+ RMAppAttemptEventType.ATTEMPT_ADDED));
+ }
+
+ private synchronized void doneApplication(ApplicationId applicationId,
+ RMAppState finalState) {
+ SchedulerApplication application = applications.get(applicationId);
+
+ // Inform the activeUsersManager
+ activeUsersManager.deactivateApplication(application.getUser(),
+ applicationId);
+ applications.remove(applicationId);
}
private synchronized void doneApplicationAttempt(
@@ -382,17 +417,11 @@ public class FifoScheduler implements Re
RMContainerEventType.KILL);
}
- // Inform the activeUsersManager
- synchronized (application) {
- activeUsersManager.deactivateApplication(
- application.getUser(), application.getApplicationId());
- }
-
// Clean up pending requests, metrics etc.
application.stop(rmAppAttemptFinalState);
// Remove the application
- applications.remove(applicationAttemptId);
+ appAttempts.remove(applicationAttemptId);
}
/**
@@ -403,10 +432,10 @@ public class FifoScheduler implements Re
private void assignContainers(FiCaSchedulerNode node) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
- " #applications=" + applications.size());
+ " #applications=" + appAttempts.size());
// Try to assign containers to applications in fifo order
- for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
+ for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : appAttempts
.entrySet()) {
FiCaSchedulerApp application = e.getValue();
LOG.debug("pre-assignContainers");
@@ -445,7 +474,7 @@ public class FifoScheduler implements Re
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
- for (FiCaSchedulerApp application : applications.values()) {
+ for (FiCaSchedulerApp application : appAttempts.values()) {
application.setHeadroom(Resources.subtract(clusterResource, usedResource));
}
}
@@ -697,12 +726,25 @@ public class FifoScheduler implements Re
nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
+ case APP_ADDED:
+ {
+ AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+ addApplication(appAddedEvent.getApplicationId(),
+ appAddedEvent.getQueue(), appAddedEvent.getUser());
+ }
+ break;
+ case APP_REMOVED:
+ {
+ AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
+ doneApplication(appRemovedEvent.getApplicationID(),
+ appRemovedEvent.getFinalState());
+ }
+ break;
case APP_ATTEMPT_ADDED:
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
- addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getUser());
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
}
break;
case APP_ATTEMPT_REMOVED:
@@ -867,8 +909,8 @@ public class FifoScheduler implements Re
public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
- applications.size());
- for (FiCaSchedulerApp app : applications.values()) {
+ appAttempts.size());
+ for (FiCaSchedulerApp app : appAttempts.values()) {
apps.add(app.getApplicationAttemptId());
}
return apps;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Thu Jan 2 20:19:45 2014
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -164,11 +165,14 @@ public class Application {
final ResourceScheduler scheduler = resourceManager.getResourceScheduler();
resourceManager.getClientRMService().submitApplication(request);
-
+
// Notify scheduler
- AppAttemptAddedSchedulerEvent appAddedEvent1 = new AppAttemptAddedSchedulerEvent(
- this.applicationAttemptId, this.queue, this.user);
- scheduler.handle(appAddedEvent1);
+ AppAddedSchedulerEvent addAppEvent =
+ new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
+ scheduler.handle(addAppEvent);
+ AppAttemptAddedSchedulerEvent addAttemptEvent =
+ new AppAttemptAddedSchedulerEvent(this.applicationAttemptId);
+ scheduler.handle(addAttemptEvent);
}
public synchronized void addResourceRequestSpec(
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Thu Jan 2 20:19:45 2014
@@ -649,7 +649,7 @@ public class TestClientRMService {
.currentTimeMillis(), "YARN"));
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1);
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
- rmContext, yarnScheduler, null, asContext, config, null);
+ rmContext, yarnScheduler, null, asContext, config);
when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
return app;
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Thu Jan 2 20:19:45 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -297,9 +298,12 @@ public class TestFifoScheduler {
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, 1);
- SchedulerEvent event1 =
- new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
- fs.handle(event1);
+ SchedulerEvent appEvent =
+ new AppAddedSchedulerEvent(appId1, "queue", "user");
+ fs.handle(appEvent);
+ SchedulerEvent attemptEvent =
+ new AppAttemptAddedSchedulerEvent(appAttemptId1);
+ fs.handle(attemptEvent);
List<ContainerId> emptyId = new ArrayList<ContainerId>();
List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
@@ -388,16 +392,22 @@ public class TestFifoScheduler {
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, 1);
- SchedulerEvent event1 =
- new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
- fs.handle(event1);
+ SchedulerEvent appEvent =
+ new AppAddedSchedulerEvent(appId1, "queue", "user");
+ fs.handle(appEvent);
+ SchedulerEvent attemptEvent =
+ new AppAttemptAddedSchedulerEvent(appAttemptId1);
+ fs.handle(attemptEvent);
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
appId2, 1);
- SchedulerEvent event2 =
- new AppAttemptAddedSchedulerEvent(appAttemptId2, "queue", "user");
- fs.handle(event2);
+ SchedulerEvent appEvent2 =
+ new AppAddedSchedulerEvent(appId2, "queue", "user");
+ fs.handle(appEvent2);
+ SchedulerEvent attemptEvent2 =
+ new AppAttemptAddedSchedulerEvent(appAttemptId2);
+ fs.handle(attemptEvent2);
List<ContainerId> emptyId = new ArrayList<ContainerId>();
List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu Jan 2 20:19:45 2014
@@ -248,7 +248,7 @@ public class TestRMRestart {
// verify correct number of attempts and other data
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
Assert.assertNotNull(loadedApp1);
- //Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
+ Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
Assert.assertEquals(app1.getApplicationSubmissionContext()
.getApplicationId(), loadedApp1.getApplicationSubmissionContext()
.getApplicationId());
@@ -261,7 +261,7 @@ public class TestRMRestart {
.getApplicationId());
// verify state machine kicked into expected states
- rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING);
+ rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
// verify attempts for apps
@@ -299,7 +299,11 @@ public class TestRMRestart {
nm2.registerNode();
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
- Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
+ // wait for the 2nd attempt to be started.
+ int timeoutSecs = 0;
+ while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {;
+ Thread.sleep(200);
+ }
// verify no more reboot response sent
hbResponse = nm1.nodeHeartbeat(true);
@@ -476,10 +480,10 @@ public class TestRMRestart {
Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction());
RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
- // application should be in running state
- rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+ // application should be in ACCEPTED state
+ rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
- Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+ Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
// new attempt should not be started
Assert.assertEquals(2, rmApp.getAppAttempts().size());
// am1 attempt should be in FAILED state where as am2 attempt should be in
@@ -516,9 +520,9 @@ public class TestRMRestart {
nm1.setResourceTrackerService(rm3.getResourceTrackerService());
rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId());
- // application should be in running state
- rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
- Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING);
+ // application should be in ACCEPTED state
+ rm3.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(rmApp.getState(), RMAppState.ACCEPTED);
// new attempt should not be started
Assert.assertEquals(3, rmApp.getAppAttempts().size());
// am1 and am2 attempts should be in FAILED state where as am3 should be
@@ -562,6 +566,11 @@ public class TestRMRestart {
rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
+ // wait for the attempt to be created.
+ int timeoutSecs = 0;
+ while (rmApp.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {
+ Thread.sleep(200);
+ }
Assert.assertEquals(4, rmApp.getAppAttempts().size());
Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
rm4.waitForState(latestAppAttemptId, RMAppAttemptState.SCHEDULED);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Thu Jan 2 20:19:45 2014
@@ -567,7 +567,9 @@ public class TestRMAppTransitions {
RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
- assertAppAndAttemptKilled(application);
+ sendAppUpdateSavedEvent(application);
+ assertKilled(application);
+ assertAppFinalStateSaved(application);
}
@Test
@@ -582,7 +584,7 @@ public class TestRMAppTransitions {
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
- assertAppState(RMAppState.SUBMITTED, application);
+ assertAppState(RMAppState.ACCEPTED, application);
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_ACCEPTED);
@@ -612,7 +614,9 @@ public class TestRMAppTransitions {
RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
- assertAppAndAttemptKilled(application);
+ sendAppUpdateSavedEvent(application);
+ assertKilled(application);
+ assertAppFinalStateSaved(application);
}
@Test
@@ -654,7 +658,7 @@ public class TestRMAppTransitions {
RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
rmDispatcher.await();
- assertAppState(RMAppState.SUBMITTED, application);
+ assertAppState(RMAppState.ACCEPTED, application);
appAttempt = application.getCurrentAppAttempt();
Assert.assertEquals(++expectedAttemptId,
appAttempt.getAppAttemptId().getAttemptId());
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Thu Jan 2 20:19:45 2014
@@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -258,7 +257,7 @@ public class TestRMAppAttemptTransitions
application = mock(RMApp.class);
applicationAttempt =
new RMAppAttemptImpl(applicationAttemptId, rmContext, scheduler,
- masterService, submissionContext, new Configuration(), user);
+ masterService, submissionContext, new Configuration());
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
when(application.getApplicationId()).thenReturn(applicationId);
@@ -408,9 +407,6 @@ public class TestRMAppAttemptTransitions
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus());
-
- // Check events
- verify(application).handle(any(RMAppEvent.class));
}
/**
@@ -446,7 +442,7 @@ public class TestRMAppAttemptTransitions
assertEquals(0, applicationAttempt.getRanNodes().size());
// Check events
- verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
+ verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyAttemptFinalStateSaved();
}
@@ -544,7 +540,7 @@ public class TestRMAppAttemptTransitions
applicationAttempt.handle(
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
- RMAppAttemptEventType.APP_ACCEPTED));
+ RMAppAttemptEventType.ATTEMPT_ADDED));
if(unmanagedAM){
assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
@@ -703,16 +699,6 @@ public class TestRMAppAttemptTransitions
RMAppAttemptEventType.RECOVER));
testAppAttemptRecoveredState();
}
-
- @Test
- public void testSubmittedToFailed() {
- submitApplicationAttempt();
- String message = "Rejected";
- applicationAttempt.handle(
- new RMAppAttemptRejectedEvent(
- applicationAttempt.getAppAttemptId(), message));
- testAppAttemptSubmittedToFailedState(message);
- }
@Test
public void testSubmittedToKilled() {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Thu Jan 2 20:19:45 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -58,8 +59,12 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -378,4 +383,24 @@ public class TestSchedulerUtils {
ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1), "x");
Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus());
}
+
+ public static <T> SchedulerApplication verifyAppAddedAndRemovedFromScheduler(
+ final Map<ApplicationId, SchedulerApplication> applications,
+ EventHandler<SchedulerEvent> handler, String queueName) throws Exception {
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ AppAddedSchedulerEvent appAddedEvent =
+ new AppAddedSchedulerEvent(appId, queueName, "user");
+ handler.handle(appAddedEvent);
+ SchedulerApplication app = applications.get(appId);
+ // verify application is added.
+ Assert.assertNotNull(app);
+ Assert.assertEquals("user", app.getUser());
+
+ AppRemovedSchedulerEvent appRemoveEvent =
+ new AppRemovedSchedulerEvent(appId, RMAppState.FINISHED);
+ handler.handle(appRemoveEvent);
+ Assert.assertNull(applications.get(appId));
+ return app;
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Thu Jan 2 20:19:45 2014
@@ -304,7 +304,7 @@ public class TestApplicationLimits {
int APPLICATION_ID = 0;
// Submit first application
FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplication(app_0, user_0, A);
+ queue.submitApplicationAttempt(app_0, user_0);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -312,7 +312,7 @@ public class TestApplicationLimits {
// Submit second application
FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplication(app_1, user_0, A);
+ queue.submitApplicationAttempt(app_1, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -320,14 +320,14 @@ public class TestApplicationLimits {
// Submit third application, should remain pending
FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplication(app_2, user_0, A);
+ queue.submitApplicationAttempt(app_2, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(1, queue.getNumPendingApplications(user_0));
// Finish one application, app_2 should be activated
- queue.finishApplication(app_0, A);
+ queue.finishApplicationAttempt(app_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -335,7 +335,7 @@ public class TestApplicationLimits {
// Submit another one for user_0
FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplication(app_3, user_0, A);
+ queue.submitApplicationAttempt(app_3, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -346,7 +346,7 @@ public class TestApplicationLimits {
// Submit first app for user_1
FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
- queue.submitApplication(app_4, user_1, A);
+ queue.submitApplicationAttempt(app_4, user_1);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -356,7 +356,7 @@ public class TestApplicationLimits {
// Submit second app for user_1, should block due to queue-limit
FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
- queue.submitApplication(app_5, user_1, A);
+ queue.submitApplicationAttempt(app_5, user_1);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(2, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -365,7 +365,7 @@ public class TestApplicationLimits {
assertEquals(1, queue.getNumPendingApplications(user_1));
// Now finish one app of user_1 so app_5 should be activated
- queue.finishApplication(app_4, A);
+ queue.finishApplicationAttempt(app_4, A);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -385,7 +385,7 @@ public class TestApplicationLimits {
// Submit first application
FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplication(app_0, user_0, A);
+ queue.submitApplicationAttempt(app_0, user_0);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -394,7 +394,7 @@ public class TestApplicationLimits {
// Submit second application
FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplication(app_1, user_0, A);
+ queue.submitApplicationAttempt(app_1, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -403,7 +403,7 @@ public class TestApplicationLimits {
// Submit third application, should remain pending
FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplication(app_2, user_0, A);
+ queue.submitApplicationAttempt(app_2, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -412,7 +412,7 @@ public class TestApplicationLimits {
// Submit fourth application, should remain pending
FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplication(app_3, user_0, A);
+ queue.submitApplicationAttempt(app_3, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(2, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -420,7 +420,7 @@ public class TestApplicationLimits {
assertTrue(queue.pendingApplications.contains(app_3));
// Kill 3rd pending application
- queue.finishApplication(app_2, A);
+ queue.finishApplicationAttempt(app_2, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -429,7 +429,7 @@ public class TestApplicationLimits {
assertFalse(queue.activeApplications.contains(app_2));
// Finish 1st application, app_3 should become active
- queue.finishApplication(app_0, A);
+ queue.finishApplicationAttempt(app_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -439,7 +439,7 @@ public class TestApplicationLimits {
assertFalse(queue.activeApplications.contains(app_0));
// Finish 2nd application
- queue.finishApplication(app_1, A);
+ queue.finishApplicationAttempt(app_1, A);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -447,7 +447,7 @@ public class TestApplicationLimits {
assertFalse(queue.activeApplications.contains(app_1));
// Finish 4th application
- queue.finishApplication(app_3, A);
+ queue.finishApplicationAttempt(app_3, A);
assertEquals(0, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(0, queue.getNumActiveApplications(user_0));
@@ -507,7 +507,7 @@ public class TestApplicationLimits {
FiCaSchedulerApp app_0_0 =
spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue,
queue.getActiveUsersManager(), rmContext));
- queue.submitApplication(app_0_0, user_0, A);
+ queue.submitApplicationAttempt(app_0_0, user_0);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
app_0_0_requests.add(
@@ -526,7 +526,7 @@ public class TestApplicationLimits {
FiCaSchedulerApp app_0_1 =
spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue,
queue.getActiveUsersManager(), rmContext));
- queue.submitApplication(app_0_1, user_0, A);
+ queue.submitApplicationAttempt(app_0_1, user_0);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
app_0_1_requests.add(
@@ -545,7 +545,7 @@ public class TestApplicationLimits {
FiCaSchedulerApp app_1_0 =
spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue,
queue.getActiveUsersManager(), rmContext));
- queue.submitApplication(app_1_0, user_1, A);
+ queue.submitApplicationAttempt(app_1_0, user_1);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
app_1_0_requests.add(
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Thu Jan 2 20:19:45 2014
@@ -64,7 +64,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -555,9 +558,12 @@ public class TestCapacityScheduler {
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
- SchedulerEvent event =
- new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user");
- cs.handle(event);
+ SchedulerEvent addAppEvent =
+ new AppAddedSchedulerEvent(appId, "default", "user");
+ cs.handle(addAppEvent);
+ SchedulerEvent addAttemptEvent =
+ new AppAttemptAddedSchedulerEvent(appAttemptId);
+ cs.handle(addAttemptEvent);
// Verify the blacklist can be updated independent of requesting containers
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
@@ -596,10 +602,10 @@ public class TestCapacityScheduler {
public void testConcurrentAccessOnApplications() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
verifyConcurrentAccessOnApplications(
- cs.applications, FiCaSchedulerApp.class, Queue.class);
+ cs.appAttempts, FiCaSchedulerApp.class, Queue.class);
}
- public static <T extends SchedulerApplication, Q extends Queue>
+ public static <T extends SchedulerApplicationAttempt, Q extends Queue>
void verifyConcurrentAccessOnApplications(
final Map<ApplicationAttemptId, T> applications, Class<T> appClazz,
final Class<Q> queueClazz)
@@ -682,4 +688,21 @@ public class TestCapacityScheduler {
Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
}
-}
+ @Test
+ public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
+
+ AsyncDispatcher rmDispatcher = new AsyncDispatcher();
+ CapacityScheduler cs = new CapacityScheduler();
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null,
+ null, null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM()));
+
+ SchedulerApplication app =
+ TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
+ cs.applications, cs, "a1");
+ Assert.assertEquals("a1", app.getQueue().getQueueName());
+ }
+ }