You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/02/07 21:33:02 UTC
svn commit: r1565792 [3/5] - in
/hadoop/common/branches/branch-2.3/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/record...
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Feb 7 20:33:01 2014
@@ -38,7 +38,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -59,19 +58,15 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -80,10 +75,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -121,10 +114,10 @@ import com.google.common.annotations.Vis
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
-public class FairScheduler extends AbstractYarnScheduler implements
- ResourceScheduler {
+public class FairScheduler implements ResourceScheduler {
private boolean initialized;
private FairSchedulerConfiguration conf;
+ private RMContext rmContext;
private Resource minimumAllocation;
private Resource maximumAllocation;
private Resource incrAllocation;
@@ -158,6 +151,12 @@ public class FairScheduler extends Abstr
// Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
+ // This stores per-application scheduling information, indexed by
+ // attempt ID's for fast lookup.
+ @VisibleForTesting
+ protected Map<ApplicationAttemptId, FSSchedulerApp> applications =
+ new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
+
// Nodes in the cluster, indexed by NodeId
private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
@@ -252,21 +251,10 @@ public class FairScheduler extends Abstr
return queueMgr;
}
- @Override
- public RMContainer getRMContainer(ContainerId containerId) {
- FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
- return (attempt == null) ? null : attempt.getRMContainer(containerId);
- }
-
- private FSSchedulerApp getCurrentAttemptForContainer(
- ContainerId containerId) {
- SchedulerApplication app =
- applications.get(containerId.getApplicationAttemptId()
- .getApplicationId());
- if (app != null) {
- return (FSSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
+ private RMContainer getRMContainer(ContainerId containerId) {
+ FSSchedulerApp application =
+ applications.get(containerId.getApplicationAttemptId());
+ return (application == null) ? null : application.getRMContainer(containerId);
}
/**
@@ -603,87 +591,64 @@ public class FairScheduler extends Abstr
* user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
- protected synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user) {
+ protected synchronized void addApplicationAttempt(
+ ApplicationAttemptId applicationAttemptId, String queueName, String user) {
if (queueName == null || queueName.isEmpty()) {
- String message = "Reject application " + applicationId +
+ String message = "Reject application " + applicationAttemptId +
" submitted by user " + user + " with an empty queue name.";
LOG.info(message);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppRejectedEvent(applicationId, message));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptRejectedEvent(applicationAttemptId, message));
return;
}
- RMApp rmApp = rmContext.getRMApps().get(applicationId);
+ RMApp rmApp = rmContext.getRMApps().get(
+ applicationAttemptId.getApplicationId());
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
if (queue == null) {
rmContext.getDispatcher().getEventHandler().handle(
- new RMAppRejectedEvent(applicationId,
+ new RMAppAttemptRejectedEvent(applicationAttemptId,
"Application rejected by queue placement policy"));
return;
}
+ FSSchedulerApp schedulerApp =
+ new FSSchedulerApp(applicationAttemptId, user,
+ queue, new ActiveUsersManager(getRootQueueMetrics()),
+ rmContext);
+
// Enforce ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
&& !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
String msg = "User " + userUgi.getUserName() +
- " cannot submit applications to queue " + queue.getName();
+ " cannot submit applications to queue " + queue.getName();
LOG.info(msg);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppRejectedEvent(applicationId, msg));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptRejectedEvent(applicationAttemptId, msg));
return;
}
-
- SchedulerApplication application =
- new SchedulerApplication(queue, user);
- applications.put(applicationId, application);
- queue.getMetrics().submitApp(user);
-
- LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", in queue: " + queueName + ", currently num of applications: "
- + applications.size());
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
- }
-
- /**
- * Add a new application attempt to the scheduler.
- */
- protected synchronized void addApplicationAttempt(
- ApplicationAttemptId applicationAttemptId,
- boolean transferStateFromPreviousAttempt) {
- SchedulerApplication application =
- applications.get(applicationAttemptId.getApplicationId());
- String user = application.getUser();
- FSLeafQueue queue = (FSLeafQueue) application.getQueue();
-
- FSSchedulerApp attempt =
- new FSSchedulerApp(applicationAttemptId, user,
- queue, new ActiveUsersManager(getRootQueueMetrics()),
- rmContext);
- if (transferStateFromPreviousAttempt) {
- attempt.transferStateFromPreviousAttempt(application
- .getCurrentAppAttempt());
- }
- application.setCurrentAppAttempt(attempt);
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
- queue.addApp(attempt, runnable);
+ queue.addApp(schedulerApp, runnable);
if (runnable) {
- maxRunningEnforcer.trackRunnableApp(attempt);
+ maxRunningEnforcer.trackRunnableApp(schedulerApp);
} else {
- maxRunningEnforcer.trackNonRunnableApp(attempt);
+ maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
}
- queue.getMetrics().submitAppAttempt(user);
+ queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
+
+ applications.put(applicationAttemptId, schedulerApp);
+
+ LOG.info("Application Submission: " + applicationAttemptId +
+ ", user: "+ user +
+ ", currently active: " + applications.size());
- LOG.info("Added Application Attempt " + applicationAttemptId
- + " to scheduler from user: " + user);
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
+ RMAppAttemptEventType.APP_ACCEPTED));
}
@VisibleForTesting
@@ -709,40 +674,21 @@ public class FairScheduler extends Abstr
return queue;
}
- private synchronized void removeApplication(ApplicationId applicationId,
- RMAppState finalState) {
- SchedulerApplication application = applications.get(applicationId);
- if (application == null){
- LOG.warn("Couldn't find application " + applicationId);
- return;
- }
- application.stop(finalState);
- applications.remove(applicationId);
- }
-
private synchronized void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
+ RMAppAttemptState rmAppAttemptFinalState) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
- SchedulerApplication application =
- applications.get(applicationAttemptId.getApplicationId());
- FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
- if (attempt == null || application == null) {
+ FSSchedulerApp application = applications.get(applicationAttemptId);
+
+ if (application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
return;
}
// Release all the running containers
- for (RMContainer rmContainer : attempt.getLiveContainers()) {
- if (keepContainers
- && rmContainer.getState().equals(RMContainerState.RUNNING)) {
- // do not kill the running container in the case of work-preserving AM
- // restart.
- LOG.info("Skip killing " + rmContainer.getContainerId());
- continue;
- }
+ for (RMContainer rmContainer : application.getLiveContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
@@ -751,26 +697,30 @@ public class FairScheduler extends Abstr
}
// Release all reserved containers
- for (RMContainer rmContainer : attempt.getReservedContainers()) {
+ for (RMContainer rmContainer : application.getReservedContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
"Application Complete"),
- RMContainerEventType.KILL);
+ RMContainerEventType.KILL);
}
+
// Clean up pending requests, metrics etc.
- attempt.stop(rmAppAttemptFinalState);
+ application.stop(rmAppAttemptFinalState);
// Inform the queue
- FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
+ FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
.getQueueName(), false);
- boolean wasRunnable = queue.removeApp(attempt);
+ boolean wasRunnable = queue.removeApp(application);
if (wasRunnable) {
- maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
} else {
- maxRunningEnforcer.untrackNonRunnableApp(attempt);
+ maxRunningEnforcer.untrackNonRunnableApp(application);
}
+
+ // Remove from our data-structure
+ applications.remove(applicationAttemptId);
}
/**
@@ -786,13 +736,11 @@ public class FairScheduler extends Abstr
Container container = rmContainer.getContainer();
// Get the application for the finished container
- FSSchedulerApp application =
- getCurrentAttemptForContainer(container.getId());
- ApplicationId appId =
- container.getId().getApplicationAttemptId().getApplicationId();
+ ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
+ FSSchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
LOG.info("Container " + container + " of" +
- " unknown application attempt " + appId +
+ " unknown application " + applicationAttemptId +
" completed with event " + event);
return;
}
@@ -809,9 +757,10 @@ public class FairScheduler extends Abstr
updateRootQueueMetrics();
}
- LOG.info("Application attempt " + application.getApplicationAttemptId()
- + " released container " + container.getId() + " on node: " + node
- + " with event: " + event);
+ LOG.info("Application " + applicationAttemptId +
+ " released container " + container.getId() +
+ " on node: " + node +
+ " with event: " + event);
}
private synchronized void addNode(RMNode node) {
@@ -862,7 +811,7 @@ public class FairScheduler extends Abstr
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Make sure this application exists
- FSSchedulerApp application = getSchedulerApp(appAttemptId);
+ FSSchedulerApp application = applications.get(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
@@ -932,11 +881,12 @@ public class FairScheduler extends Abstr
*/
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
- FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
+ ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
+ FSSchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
+ LOG.info("Unknown application: " + applicationAttemptId +
+ " launched container " + containerId +
+ " on node: " + node);
return;
}
@@ -1075,34 +1025,28 @@ public class FairScheduler extends Abstr
}
public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
- SchedulerApplication app =
- applications.get(appAttemptId.getApplicationId());
- if (app != null) {
- return (FSSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
+ return applications.get(appAttemptId);
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
- FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
- if (attempt == null) {
+ if (!applications.containsKey(appAttemptId)) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
- return new SchedulerAppReport(attempt);
+ return new SchedulerAppReport(applications.get(appAttemptId));
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
- FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
- if (attempt == null) {
+ FSSchedulerApp app = applications.get(appAttemptId);
+ if (app == null) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
- return attempt.getResourceUsageReport();
+ return app.getResourceUsageReport();
}
/**
@@ -1146,30 +1090,15 @@ public class FairScheduler extends Abstr
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode());
break;
- case APP_ADDED:
- if (!(event instanceof AppAddedSchedulerEvent)) {
- throw new RuntimeException("Unexpected event type: " + event);
- }
- AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
- addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser());
- break;
- case APP_REMOVED:
- if (!(event instanceof AppRemovedSchedulerEvent)) {
- throw new RuntimeException("Unexpected event type: " + event);
- }
- AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
- removeApplication(appRemovedEvent.getApplicationID(),
- appRemovedEvent.getFinalState());
- break;
case APP_ATTEMPT_ADDED:
if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
+ String queue = appAttemptAddedEvent.getQueue();
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+ queue, appAttemptAddedEvent.getUser());
break;
case APP_ATTEMPT_REMOVED:
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@@ -1177,10 +1106,8 @@ public class FairScheduler extends Abstr
}
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
(AppAttemptRemovedSchedulerEvent) event;
- removeApplicationAttempt(
- appAttemptRemovedEvent.getApplicationAttemptID(),
- appAttemptRemovedEvent.getFinalAttemptState(),
- appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
+ removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
+ appAttemptRemovedEvent.getFinalAttemptState());
break;
case CONTAINER_EXPIRED:
if (!(event instanceof ContainerExpiredSchedulerEvent)) {
@@ -1231,9 +1158,6 @@ public class FairScheduler extends Abstr
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
- // This stores per-application scheduling information
- this.applications =
- new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
@@ -1356,4 +1280,5 @@ public class FairScheduler extends Abstr
queue.collectSchedulerApplications(apps);
return apps;
}
+
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Feb 7 20:33:01 2014
@@ -37,7 +37,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -59,19 +58,14 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -80,15 +74,12 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -105,8 +96,7 @@ import com.google.common.annotations.Vis
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
-public class FifoScheduler extends AbstractYarnScheduler implements
- ResourceScheduler, Configurable {
+public class FifoScheduler implements ResourceScheduler, Configurable {
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@@ -117,6 +107,7 @@ public class FifoScheduler extends Abstr
private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
+ private RMContext rmContext;
protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
@@ -125,6 +116,11 @@ public class FifoScheduler extends Abstr
private Resource maximumAllocation;
private boolean usePortForNodeName;
+ // Use ConcurrentSkipListMap because applications need to be ordered
+ @VisibleForTesting
+ protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications
+ = new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
+
private ActiveUsersManager activeUsersManager;
private static final String DEFAULT_QUEUE_NAME = "default";
@@ -239,9 +235,6 @@ public class FifoScheduler extends Abstr
if (!this.initialized) {
validateConf(conf);
this.rmContext = rmContext;
- //Use ConcurrentSkipListMap because applications need to be ordered
- this.applications =
- new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
this.minimumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@@ -266,7 +259,7 @@ public class FifoScheduler extends Abstr
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
- FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@@ -332,114 +325,74 @@ public class FifoScheduler extends Abstr
}
@VisibleForTesting
- FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
- SchedulerApplication app =
- applications.get(applicationAttemptId.getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
+ FiCaSchedulerApp getApplication(
+ ApplicationAttemptId applicationAttemptId) {
+ return applications.get(applicationAttemptId);
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
+ FiCaSchedulerApp app = getApplication(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
+ FiCaSchedulerApp app = getApplication(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
}
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
-
- private synchronized void addApplication(ApplicationId applicationId,
- String queue, String user) {
- SchedulerApplication application =
- new SchedulerApplication(DEFAULT_QUEUE, user);
- applications.put(applicationId, application);
- metrics.submitApp(user);
- LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", currently num of applications: " + applications.size());
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
- }
-
- private synchronized void
- addApplicationAttempt(ApplicationAttemptId appAttemptId,
- boolean transferStateFromPreviousAttempt) {
- SchedulerApplication application =
- applications.get(appAttemptId.getApplicationId());
- String user = application.getUser();
+
+ private synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId,
+ String user) {
// TODO: Fix store
- FiCaSchedulerApp schedulerApp =
- new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
- activeUsersManager, this.rmContext);
-
- if (transferStateFromPreviousAttempt) {
- schedulerApp.transferStateFromPreviousAttempt(application
- .getCurrentAppAttempt());
- }
- application.setCurrentAppAttempt(schedulerApp);
-
- metrics.submitAppAttempt(user);
- LOG.info("Added Application Attempt " + appAttemptId
- + " to scheduler from user " + application.getUser());
+ FiCaSchedulerApp schedulerApp =
+ new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
+ this.rmContext);
+ applications.put(appAttemptId, schedulerApp);
+ metrics.submitApp(user, appAttemptId.getAttemptId());
+ LOG.info("Application Submission: " + appAttemptId.getApplicationId() +
+ " from " + user + ", currently active: " + applications.size());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
- }
-
- private synchronized void doneApplication(ApplicationId applicationId,
- RMAppState finalState) {
- SchedulerApplication application = applications.get(applicationId);
- if (application == null){
- LOG.warn("Couldn't find application " + applicationId);
- return;
- }
-
- // Inform the activeUsersManager
- activeUsersManager.deactivateApplication(application.getUser(),
- applicationId);
- application.stop(finalState);
- applications.remove(applicationId);
+ RMAppAttemptEventType.APP_ACCEPTED));
}
private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
+ RMAppAttemptState rmAppAttemptFinalState)
throws IOException {
- FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
- SchedulerApplication application =
- applications.get(applicationAttemptId.getApplicationId());
- if (application == null || attempt == null) {
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ if (application == null) {
throw new IOException("Unknown application " + applicationAttemptId +
" has completed!");
}
// Kill all 'live' containers
- for (RMContainer container : attempt.getLiveContainers()) {
- if (keepContainers
- && container.getState().equals(RMContainerState.RUNNING)) {
- // do not kill the running container in the case of work-preserving AM
- // restart.
- LOG.info("Skip killing " + container.getContainerId());
- continue;
- }
- containerCompleted(container,
- SchedulerUtils.createAbnormalContainerStatus(
- container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
+ for (RMContainer container : application.getLiveContainers()) {
+ containerCompleted(container,
+ SchedulerUtils.createAbnormalContainerStatus(
+ container.getContainerId(),
+ SchedulerUtils.COMPLETED_APPLICATION),
+ RMContainerEventType.KILL);
+ }
+
+ // Inform the activeUsersManager
+ synchronized (application) {
+ activeUsersManager.deactivateApplication(
+ application.getUser(), application.getApplicationId());
}
// Clean up pending requests, metrics etc.
- attempt.stop(rmAppAttemptFinalState);
+ application.stop(rmAppAttemptFinalState);
+
+ // Remove the application
+ applications.remove(applicationAttemptId);
}
/**
@@ -453,10 +406,9 @@ public class FifoScheduler extends Abstr
" #applications=" + applications.size());
// Try to assign containers to applications in fifo order
- for (Map.Entry<ApplicationId, SchedulerApplication> e : applications
+ for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
.entrySet()) {
- FiCaSchedulerApp application =
- (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
+ FiCaSchedulerApp application = e.getValue();
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
@@ -493,10 +445,8 @@ public class FifoScheduler extends Abstr
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
- for (SchedulerApplication application : applications.values()) {
- FiCaSchedulerApp attempt =
- (FiCaSchedulerApp) application.getCurrentAppAttempt();
- attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
+ for (FiCaSchedulerApp application : applications.values()) {
+ application.setHeadroom(Resources.subtract(clusterResource, usedResource));
}
}
@@ -747,26 +697,12 @@ public class FifoScheduler extends Abstr
nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
- case APP_ADDED:
- {
- AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
- addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser());
- }
- break;
- case APP_REMOVED:
- {
- AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
- doneApplication(appRemovedEvent.getApplicationID(),
- appRemovedEvent.getFinalState());
- }
- break;
case APP_ATTEMPT_ADDED:
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+ appAttemptAddedEvent.getUser());
}
break;
case APP_ATTEMPT_REMOVED:
@@ -776,8 +712,7 @@ public class FifoScheduler extends Abstr
try {
doneApplicationAttempt(
appAttemptRemovedEvent.getApplicationAttemptID(),
- appAttemptRemovedEvent.getFinalAttemptState(),
- appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
+ appAttemptRemovedEvent.getFinalAttemptState());
} catch(IOException ie) {
LOG.error("Unable to remove application "
+ appAttemptRemovedEvent.getApplicationAttemptID(), ie);
@@ -803,11 +738,12 @@ public class FifoScheduler extends Abstr
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
- FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
+ ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
+ LOG.info("Unknown application: " + applicationAttemptId +
+ " launched container " + containerId +
+ " on node: " + node);
// Some unknown container sneaked into the system. Kill it.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
@@ -828,16 +764,14 @@ public class FifoScheduler extends Abstr
// Get the application for the finished container
Container container = rmContainer.getContainer();
- FiCaSchedulerApp application =
- getCurrentAttemptForContainer(container.getId());
- ApplicationId appId =
- container.getId().getApplicationAttemptId().getApplicationId();
+ ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
// Get the node on which the container was allocated
FiCaSchedulerNode node = getNode(container.getNodeId());
if (application == null) {
- LOG.info("Unknown application: " + appId +
+ LOG.info("Unknown application: " + applicationAttemptId +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
@@ -853,7 +787,7 @@ public class FifoScheduler extends Abstr
// Update total usage
Resources.subtractFrom(usedResource, container.getResource());
- LOG.info("Application attempt " + application.getApplicationAttemptId() +
+ LOG.info("Application " + applicationAttemptId +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
@@ -911,22 +845,11 @@ public class FifoScheduler extends Abstr
FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
-
- @Override
- public RMContainer getRMContainer(ContainerId containerId) {
- FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
- return (attempt == null) ? null : attempt.getRMContainer(containerId);
- }
-
- private FiCaSchedulerApp getCurrentAttemptForContainer(
- ContainerId containerId) {
- SchedulerApplication app =
- applications.get(containerId.getApplicationAttemptId()
- .getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
+
+ private RMContainer getRMContainer(ContainerId containerId) {
+ FiCaSchedulerApp application =
+ getApplication(containerId.getApplicationAttemptId());
+ return (application == null) ? null : application.getRMContainer(containerId);
}
@Override
@@ -943,12 +866,12 @@ public class FifoScheduler extends Abstr
@Override
public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
- List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(
+ List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
applications.size());
- for (SchedulerApplication app : applications.values()) {
- attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
+ for (FiCaSchedulerApp app : applications.values()) {
+ apps.add(app.getApplicationAttemptId());
}
- return attempts;
+ return apps;
} else {
return null;
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Fri Feb 7 20:33:01 2014
@@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -165,14 +164,11 @@ public class Application {
final ResourceScheduler scheduler = resourceManager.getResourceScheduler();
resourceManager.getClientRMService().submitApplication(request);
-
+
// Notify scheduler
- AppAddedSchedulerEvent addAppEvent =
- new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
- scheduler.handle(addAppEvent);
- AppAttemptAddedSchedulerEvent addAttemptEvent =
- new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false);
- scheduler.handle(addAttemptEvent);
+ AppAttemptAddedSchedulerEvent appAddedEvent1 = new AppAttemptAddedSchedulerEvent(
+ this.applicationAttemptId, this.queue, this.user);
+ scheduler.handle(appAddedEvent1);
}
public synchronized void addResourceRequestSpec(
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Fri Feb 7 20:33:01 2014
@@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
-import org.mortbay.log.Log;
public class MockNM {
@@ -131,13 +130,12 @@ public class MockNM {
int containerId, ContainerState containerState) throws Exception {
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
new HashMap<ApplicationId, List<ContainerStatus>>(1);
- ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
- BuilderUtils.newContainerId(attemptId, containerId), containerState,
- "Success", 0);
+ ContainerStatus amContainerStatus = BuilderUtils.newContainerStatus(
+ BuilderUtils.newContainerId(attemptId, 1),
+ ContainerState.COMPLETE, "Success", 0);
ArrayList<ContainerStatus> containerStatusList =
new ArrayList<ContainerStatus>(1);
- containerStatusList.add(containerStatus);
- Log.info("ContainerStatus: " + containerStatus);
+ containerStatusList.add(amContainerStatus);
nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
return nodeHeartbeat(nodeUpdate, true);
}
@@ -154,7 +152,6 @@ public class MockNM {
status.setResponseId(resId);
status.setNodeId(nodeId);
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
- Log.info("entry.getValue() " + entry.getValue());
status.setContainersStatuses(entry.getValue());
}
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Fri Feb 7 20:33:01 2014
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -41,10 +40,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -60,8 +56,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -128,33 +122,6 @@ public class MockRM extends ResourceMana
attempt.getAppAttemptState());
}
- public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
- throws Exception {
- int timeoutSecs = 0;
- while (getResourceScheduler().getRMContainer(containerId) == null
- && timeoutSecs++ < 40) {
- System.out.println("Waiting for" + containerId + " to be allocated.");
- nm.nodeHeartbeat(true);
- Thread.sleep(200);
- }
- }
-
- public void waitForState(MockNM nm, ContainerId containerId,
- RMContainerState containerState) throws Exception {
- RMContainer container = getResourceScheduler().getRMContainer(containerId);
- Assert.assertNotNull("Container shouldn't be null", container);
- int timeoutSecs = 0;
- while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) {
- System.out.println("Container : " + containerId + " State is : "
- + container.getState() + " Waiting for state : " + containerState);
- nm.nodeHeartbeat(true);
- Thread.sleep(300);
- }
- System.out.println("Container State is : " + container.getState());
- Assert.assertEquals("Container state is not correct (timedout)",
- containerState, container.getState());
- }
-
// get new application id
public GetNewApplicationResponse getNewAppId() throws Exception {
ApplicationClientProtocol client = getClientRMService();
@@ -205,17 +172,7 @@ public class MockRM extends ResourceMana
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
- boolean waitForAccepted)
- throws Exception {
- return submitApp(masterMemory, name, user, acls, unmanaged, queue,
- maxAppAttempts, ts, appType, waitForAccepted, false);
- }
-
- public RMApp submitApp(int masterMemory, String name, String user,
- Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
- int maxAppAttempts, Credentials ts, String appType,
- boolean waitForAccepted, boolean keepContainers)
- throws Exception {
+ boolean waitForAccepted) throws Exception {
ApplicationClientProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@@ -225,7 +182,6 @@ public class MockRM extends ResourceMana
.newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext sub = Records
.newRecord(ApplicationSubmissionContext.class);
- sub.setKeepContainersAcrossApplicationAttempts(keepContainers);
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
@@ -465,33 +421,4 @@ public class MockRM extends ResourceMana
// override to disable webapp
}
- public static void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
- MockAM am) throws Exception {
- FinishApplicationMasterRequest req =
- FinishApplicationMasterRequest.newInstance(
- FinalApplicationStatus.SUCCEEDED, "", "");
- am.unregisterAppAttempt(req);
- am.waitForState(RMAppAttemptState.FINISHING);
- nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
- am.waitForState(RMAppAttemptState.FINISHED);
- rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
- }
-
- public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
- throws Exception {
- RMAppAttempt attempt = app.getCurrentAppAttempt();
- nm.nodeHeartbeat(true);
- MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
- rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
- return am;
- }
-
- public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm)
- throws Exception {
- MockAM am = launchAM(app, rm, nm);
- am.registerAppAttempt();
- rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
- return am;
- }
-
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Fri Feb 7 20:33:01 2014
@@ -649,7 +649,7 @@ public class TestClientRMService {
.currentTimeMillis(), "YARN"));
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1);
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
- rmContext, yarnScheduler, null, asContext, config, false);
+ rmContext, yarnScheduler, null, asContext, config, null);
when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
return app;
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Fri Feb 7 20:33:01 2014
@@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -298,12 +297,9 @@ public class TestFifoScheduler {
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, 1);
- SchedulerEvent appEvent =
- new AppAddedSchedulerEvent(appId1, "queue", "user");
- fs.handle(appEvent);
- SchedulerEvent attemptEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
- fs.handle(attemptEvent);
+ SchedulerEvent event1 =
+ new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
+ fs.handle(event1);
List<ContainerId> emptyId = new ArrayList<ContainerId>();
List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
@@ -392,22 +388,16 @@ public class TestFifoScheduler {
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, 1);
- SchedulerEvent appEvent =
- new AppAddedSchedulerEvent(appId1, "queue", "user");
- fs.handle(appEvent);
- SchedulerEvent attemptEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
- fs.handle(attemptEvent);
+ SchedulerEvent event1 =
+ new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
+ fs.handle(event1);
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
appId2, 1);
- SchedulerEvent appEvent2 =
- new AppAddedSchedulerEvent(appId2, "queue", "user");
- fs.handle(appEvent2);
- SchedulerEvent attemptEvent2 =
- new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
- fs.handle(attemptEvent2);
+ SchedulerEvent event2 =
+ new AppAttemptAddedSchedulerEvent(appAttemptId2, "queue", "user");
+ fs.handle(event2);
List<ContainerId> emptyId = new ArrayList<ContainerId>();
List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Fri Feb 7 20:33:01 2014
@@ -18,10 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.spy;
-
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@@ -33,33 +29,26 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -67,9 +56,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRM {
private static final Log LOG = LogFactory.getLog(TestRM.class);
@@ -314,8 +301,6 @@ public class TestRM {
nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
ContainerState.COMPLETE);
}
- nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1,
- ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
Assert.assertFalse(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
@@ -412,19 +397,19 @@ public class TestRM {
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
- MockRM.finishApplicationMaster(app1, rm1, nm1, am1);
+ MockAM am1 = launchAM(app1, rm1, nm1);
+ finishApplicationMaster(app1, rm1, nm1, am1);
// a failed app
RMApp app2 = rm1.submitApp(200);
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+ MockAM am2 = launchAM(app2, rm1, nm1);
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
// a killed app
RMApp app3 = rm1.submitApp(200);
- MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+ MockAM am3 = launchAM(app3, rm1, nm1);
rm1.killApp(app3.getApplicationId());
rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
@@ -464,7 +449,7 @@ public class TestRM {
// a failed app
RMApp app2 = rm1.submitApp(200);
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+ MockAM am2 = launchAM(app2, rm1, nm1);
nm1
.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
@@ -481,77 +466,26 @@ public class TestRM {
Assert.assertEquals(-1, report1.getRpcPort());
}
- /**
- * Validate killing an application when it is at accepted state.
- * @throws Exception exception
- */
- @Test (timeout = 60000)
- public void testApplicationKillAtAcceptedState() throws Exception {
-
- YarnConfiguration conf = new YarnConfiguration();
- final Dispatcher dispatcher = new AsyncDispatcher() {
- @Override
- public EventHandler getEventHandler() {
-
- class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
- @Override
- public boolean matches(Object argument) {
- if (argument instanceof RMAppAttemptEvent) {
- if (((RMAppAttemptEvent) argument).getType().equals(
- RMAppAttemptEventType.KILL)) {
- return true;
- }
- }
- return false;
- }
- }
-
- EventHandler handler = spy(super.getEventHandler());
- doNothing().when(handler).handle(argThat(new EventArgMatcher()));
- return handler;
- }
- };
-
- MockRM rm = new MockRM(conf) {
- @Override
- protected Dispatcher createDispatcher() {
- return dispatcher;
- }
- };
-
- rm.start();
- MockNM nm1 =
- new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
- nm1.registerNode();
+ private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+ throws Exception {
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ nm.nodeHeartbeat(true);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+ rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+ return am;
+ }
- // a failed app
- RMApp application = rm.submitApp(200);
- MockAM am = MockRM.launchAM(application, rm, nm1);
- am.waitForState(RMAppAttemptState.LAUNCHED);
- nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.RUNNING);
- rm.waitForState(application.getApplicationId(), RMAppState.ACCEPTED);
-
- // Now kill the application before new attempt is launched, the app report
- // returns the invalid AM host and port.
- KillApplicationRequest request =
- KillApplicationRequest.newInstance(application.getApplicationId());
- rm.getClientRMService().forceKillApplication(request);
-
- // Specific test for YARN-1689 follows
- // Now let's say a race causes AM to register now. This should not crash RM.
- am.registerAppAttempt(false);
-
- // We explicitly intercepted the kill-event to RMAppAttempt, so app should
- // still be in KILLING state.
- rm.waitForState(application.getApplicationId(), RMAppState.KILLING);
- // AM should now be in running
- rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
-
- // Simulate that appAttempt is killed.
- rm.getRMContext().getDispatcher().getEventHandler().handle(
- new RMAppEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_KILLED));
- rm.waitForState(application.getApplicationId(), RMAppState.KILLED);
+ private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
+ MockAM am) throws Exception {
+ FinishApplicationMasterRequest req =
+ FinishApplicationMasterRequest.newInstance(
+ FinalApplicationStatus.SUCCEEDED, "", "");
+ am.unregisterAppAttempt(req);
+ am.waitForState(RMAppAttemptState.FINISHING);
+ nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am.waitForState(RMAppAttemptState.FINISHED);
+ rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
}
public static void main(String[] args) throws Exception {
@@ -559,10 +493,5 @@ public class TestRM {
t.testGetNewAppId();
t.testAppWithNoContainers();
t.testAppOnMultiNode();
- t.testNMToken();
- t.testActivatingApplicationAfterAddingNM();
- t.testInvalidateAMHostPortWhenAMFailedOrKilled();
- t.testInvalidatedAMHostPortOnAMRestart();
- t.testApplicationKillAtAcceptedState();
}
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Fri Feb 7 20:33:01 2014
@@ -84,7 +84,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
@@ -180,7 +179,7 @@ public class TestRMRestart {
am1.registerAppAttempt();
// AM request for containers
- am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
+ am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
// kick the scheduler
nm1.nodeHeartbeat(true);
List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
@@ -249,7 +248,7 @@ public class TestRMRestart {
// verify correct number of attempts and other data
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
Assert.assertNotNull(loadedApp1);
- Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
+ //Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
Assert.assertEquals(app1.getApplicationSubmissionContext()
.getApplicationId(), loadedApp1.getApplicationSubmissionContext()
.getApplicationId());
@@ -262,7 +261,7 @@ public class TestRMRestart {
.getApplicationId());
// verify state machine kicked into expected states
- rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+ rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING);
rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
// verify attempts for apps
@@ -300,11 +299,7 @@ public class TestRMRestart {
nm2.registerNode();
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
- // wait for the 2nd attempt to be started.
- int timeoutSecs = 0;
- while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {;
- Thread.sleep(200);
- }
+ Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
// verify no more reboot response sent
hbResponse = nm1.nodeHeartbeat(true);
@@ -481,10 +476,10 @@ public class TestRMRestart {
Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction());
RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
- // application should be in ACCEPTED state
- rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ // application should be in running state
+ rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
- Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
+ Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
// new attempt should not be started
Assert.assertEquals(2, rmApp.getAppAttempts().size());
// am1 attempt should be in FAILED state where as am2 attempt should be in
@@ -521,9 +516,9 @@ public class TestRMRestart {
nm1.setResourceTrackerService(rm3.getResourceTrackerService());
rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId());
- // application should be in ACCEPTED state
- rm3.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
- Assert.assertEquals(rmApp.getState(), RMAppState.ACCEPTED);
+ // application should be in running state
+ rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+ Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING);
// new attempt should not be started
Assert.assertEquals(3, rmApp.getAppAttempts().size());
// am1 and am2 attempts should be in FAILED state where as am3 should be
@@ -567,11 +562,6 @@ public class TestRMRestart {
rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
- // wait for the attempt to be created.
- int timeoutSecs = 0;
- while (rmApp.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {
- Thread.sleep(200);
- }
Assert.assertEquals(4, rmApp.getAppAttempts().size());
Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
rm4.waitForState(latestAppAttemptId, RMAppAttemptState.SCHEDULED);
@@ -1544,128 +1534,6 @@ public class TestRMRestart {
Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
}
- @SuppressWarnings("resource")
- @Test
- public void testQueueMetricsOnRMRestart() throws Exception {
- conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
- MemoryRMStateStore memStore = new MemoryRMStateStore();
- memStore.init(conf);
-
- // PHASE 1: create state in an RM
- // start RM
- MockRM rm1 = new MockRM(conf, memStore);
- rm1.start();
- MockNM nm1 =
- new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
- nm1.registerNode();
- QueueMetrics qm1 = rm1.getResourceScheduler().getRootQueueMetrics();
- resetQueueMetrics(qm1);
- assertQueueMetrics(qm1, 0, 0, 0, 0);
-
- // create app that gets launched and does allocate before RM restart
- RMApp app1 = rm1.submitApp(200);
- assertQueueMetrics(qm1, 1, 1, 0, 0);
- nm1.nodeHeartbeat(true);
- RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
- ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
- rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
- MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
- am1.registerAppAttempt();
- am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
- nm1.nodeHeartbeat(true);
- List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers();
- while (conts.size() == 0) {
- nm1.nodeHeartbeat(true);
- conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers());
- Thread.sleep(500);
- }
- assertQueueMetrics(qm1, 1, 0, 1, 0);
-
- // PHASE 2: create new RM and start from old state
- // create new RM to represent restart and recover state
- MockRM rm2 = new MockRM(conf, memStore);
- rm2.start();
- nm1.setResourceTrackerService(rm2.getResourceTrackerService());
- QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
- resetQueueMetrics(qm2);
- assertQueueMetrics(qm2, 0, 0, 0, 0);
- // recover app
- RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
- am1.setAMRMProtocol(rm2.getApplicationMasterService());
- am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
- nm1.nodeHeartbeat(true);
- nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
- List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
- .getCurrentAppAttempt().getAppAttemptId(), 1),
- ContainerState.COMPLETE, "Killed AM container", 143);
- containerStatuses.add(containerStatus);
- nm1.registerNode(containerStatuses);
- int timeoutSecs = 0;
- while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {;
- Thread.sleep(200);
- }
-
- assertQueueMetrics(qm2, 1, 1, 0, 0);
- nm1.nodeHeartbeat(true);
- attempt1 = loadedApp1.getCurrentAppAttempt();
- attemptId1 = attempt1.getAppAttemptId();
- rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
- assertQueueMetrics(qm2, 1, 0, 1, 0);
- am1 = rm2.sendAMLaunched(attempt1.getAppAttemptId());
- am1.registerAppAttempt();
- am1.allocate("127.0.0.1" , 1000, 3, new ArrayList<ContainerId>());
- nm1.nodeHeartbeat(true);
- conts = am1.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers();
- while (conts.size() == 0) {
- nm1.nodeHeartbeat(true);
- conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers());
- Thread.sleep(500);
- }
-
- // finish the AMs
- finishApplicationMaster(loadedApp1, rm2, nm1, am1);
- assertQueueMetrics(qm2, 1, 0, 0, 1);
-
- // stop RM's
- rm2.stop();
- rm1.stop();
- }
-
-
- // The metrics has some carry-on value from the previous RM, because the
- // test case is in-memory, for the same queue name (e.g. root), there's
- // always a singleton QueueMetrics object.
- private int appsSubmittedCarryOn = 0;
- private int appsPendingCarryOn = 0;
- private int appsRunningCarryOn = 0;
- private int appsCompletedCarryOn = 0;
-
- private void resetQueueMetrics(QueueMetrics qm) {
- appsSubmittedCarryOn = qm.getAppsSubmitted();
- appsPendingCarryOn = qm.getAppsPending();
- appsRunningCarryOn = qm.getAppsRunning();
- appsCompletedCarryOn = qm.getAppsCompleted();
- }
-
- private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted,
- int appsPending, int appsRunning, int appsCompleted) {
- Assert.assertEquals(qm.getAppsSubmitted(),
- appsSubmitted + appsSubmittedCarryOn);
- Assert.assertEquals(qm.getAppsPending(),
- appsPending + appsPendingCarryOn);
- Assert.assertEquals(qm.getAppsRunning(),
- appsRunning + appsRunningCarryOn);
- Assert.assertEquals(qm.getAppsCompleted(),
- appsCompleted + appsCompletedCarryOn);
- }
-
public class TestMemoryRMStateStore extends MemoryRMStateStore {
int count = 0;
public int updateApp = 0;
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Fri Feb 7 20:33:01 2014
@@ -164,7 +164,7 @@ public class TestResourceManager {
// Notify scheduler application is finished.
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(
- application.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
+ application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
resourceManager.getResourceScheduler().handle(appRemovedEvent1);
checkResourceUsage(nm1, nm2);