You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2014/01/15 07:06:35 UTC
svn commit: r1558303 [3/5] - in
/hadoop/common/branches/HDFS-5535/hadoop-yarn-project: ./ hadoop-yarn/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/hadoop-yarn-api/src/ma...
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Wed Jan 15 06:06:31 2014
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -120,10 +121,10 @@ import com.google.common.annotations.Vis
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
-public class FairScheduler implements ResourceScheduler {
+public class FairScheduler extends AbstractYarnScheduler implements
+ ResourceScheduler {
private boolean initialized;
private FairSchedulerConfiguration conf;
- private RMContext rmContext;
private Resource minimumAllocation;
private Resource maximumAllocation;
private Resource incrAllocation;
@@ -157,17 +158,6 @@ public class FairScheduler implements Re
// Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
- // This stores per-application scheduling information,
- @VisibleForTesting
- protected Map<ApplicationId, SchedulerApplication> applications =
- new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
-
- // This stores per-application-attempt scheduling information, indexed by
- // attempt ID's for fast lookup.
- @VisibleForTesting
- protected Map<ApplicationAttemptId, FSSchedulerApp> appAttempts =
- new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
-
// Nodes in the cluster, indexed by NodeId
private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
@@ -262,10 +252,21 @@ public class FairScheduler implements Re
return queueMgr;
}
- private RMContainer getRMContainer(ContainerId containerId) {
- FSSchedulerApp application =
- appAttempts.get(containerId.getApplicationAttemptId());
- return (application == null) ? null : application.getRMContainer(containerId);
+ @Override
+ public RMContainer getRMContainer(ContainerId containerId) {
+ FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+ return (attempt == null) ? null : attempt.getRMContainer(containerId);
+ }
+
+ private FSSchedulerApp getCurrentAttemptForContainer(
+ ContainerId containerId) {
+ SchedulerApplication app =
+ applications.get(containerId.getApplicationAttemptId()
+ .getApplicationId());
+ if (app != null) {
+ return (FSSchedulerApp) app.getCurrentAppAttempt();
+ }
+ return null;
}
/**
@@ -638,9 +639,11 @@ public class FairScheduler implements Re
SchedulerApplication application =
new SchedulerApplication(queue, user);
applications.put(applicationId, application);
+ queue.getMetrics().submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", in queue: " + queueName);
+ + ", in queue: " + queueName + ", currently num of applications: "
+ + applications.size());
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
@@ -649,31 +652,35 @@ public class FairScheduler implements Re
* Add a new application attempt to the scheduler.
*/
protected synchronized void addApplicationAttempt(
- ApplicationAttemptId applicationAttemptId) {
+ ApplicationAttemptId applicationAttemptId,
+ boolean transferStateFromPreviousAttempt) {
SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser();
FSLeafQueue queue = (FSLeafQueue) application.getQueue();
- FSSchedulerApp schedulerApp =
+ FSSchedulerApp attempt =
new FSSchedulerApp(applicationAttemptId, user,
queue, new ActiveUsersManager(getRootQueueMetrics()),
rmContext);
+ if (transferStateFromPreviousAttempt) {
+ attempt.transferStateFromPreviousAttempt(application
+ .getCurrentAppAttempt());
+ }
+ application.setCurrentAppAttempt(attempt);
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
- queue.addApp(schedulerApp, runnable);
+ queue.addApp(attempt, runnable);
if (runnable) {
- maxRunningEnforcer.trackRunnableApp(schedulerApp);
+ maxRunningEnforcer.trackRunnableApp(attempt);
} else {
- maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
+ maxRunningEnforcer.trackNonRunnableApp(attempt);
}
- queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
- appAttempts.put(applicationAttemptId, schedulerApp);
+ queue.getMetrics().submitAppAttempt(user);
LOG.info("Added Application Attempt " + applicationAttemptId
- + " to scheduler from user: " + user + ", currently active: "
- + appAttempts.size());
+ + " to scheduler from user: " + user);
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
@@ -704,24 +711,38 @@ public class FairScheduler implements Re
private synchronized void removeApplication(ApplicationId applicationId,
RMAppState finalState) {
+ SchedulerApplication application = applications.get(applicationId);
+ if (application == null){
+ LOG.warn("Couldn't find application " + applicationId);
+ return;
+ }
+ application.stop(finalState);
applications.remove(applicationId);
}
private synchronized void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- RMAppAttemptState rmAppAttemptFinalState) {
+ RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
+ SchedulerApplication application =
+ applications.get(applicationAttemptId.getApplicationId());
+ FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
- FSSchedulerApp application = appAttempts.get(applicationAttemptId);
-
- if (application == null) {
+ if (attempt == null || application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
return;
}
// Release all the running containers
- for (RMContainer rmContainer : application.getLiveContainers()) {
+ for (RMContainer rmContainer : attempt.getLiveContainers()) {
+ if (keepContainers
+ && rmContainer.getState().equals(RMContainerState.RUNNING)) {
+ // do not kill the running container in the case of work-preserving AM
+ // restart.
+ LOG.info("Skip killing " + rmContainer.getContainerId());
+ continue;
+ }
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
@@ -730,30 +751,26 @@ public class FairScheduler implements Re
}
// Release all reserved containers
- for (RMContainer rmContainer : application.getReservedContainers()) {
+ for (RMContainer rmContainer : attempt.getReservedContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
"Application Complete"),
- RMContainerEventType.KILL);
+ RMContainerEventType.KILL);
}
-
// Clean up pending requests, metrics etc.
- application.stop(rmAppAttemptFinalState);
+ attempt.stop(rmAppAttemptFinalState);
// Inform the queue
- FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
+ FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
.getQueueName(), false);
- boolean wasRunnable = queue.removeApp(application);
+ boolean wasRunnable = queue.removeApp(attempt);
if (wasRunnable) {
- maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
} else {
- maxRunningEnforcer.untrackNonRunnableApp(application);
+ maxRunningEnforcer.untrackNonRunnableApp(attempt);
}
-
- // Remove from our data-structure
- appAttempts.remove(applicationAttemptId);
}
/**
@@ -769,11 +786,13 @@ public class FairScheduler implements Re
Container container = rmContainer.getContainer();
// Get the application for the finished container
- ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
- FSSchedulerApp application = appAttempts.get(applicationAttemptId);
+ FSSchedulerApp application =
+ getCurrentAttemptForContainer(container.getId());
+ ApplicationId appId =
+ container.getId().getApplicationAttemptId().getApplicationId();
if (application == null) {
LOG.info("Container " + container + " of" +
- " unknown application " + applicationAttemptId +
+ " unknown application attempt " + appId +
" completed with event " + event);
return;
}
@@ -790,10 +809,9 @@ public class FairScheduler implements Re
updateRootQueueMetrics();
}
- LOG.info("Application " + applicationAttemptId +
- " released container " + container.getId() +
- " on node: " + node +
- " with event: " + event);
+ LOG.info("Application attempt " + application.getApplicationAttemptId()
+ + " released container " + container.getId() + " on node: " + node
+ + " with event: " + event);
}
private synchronized void addNode(RMNode node) {
@@ -844,7 +862,7 @@ public class FairScheduler implements Re
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Make sure this application exists
- FSSchedulerApp application = appAttempts.get(appAttemptId);
+ FSSchedulerApp application = getSchedulerApp(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
@@ -914,12 +932,11 @@ public class FairScheduler implements Re
*/
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
- ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
- FSSchedulerApp application = appAttempts.get(applicationAttemptId);
+ FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
- LOG.info("Unknown application: " + applicationAttemptId +
- " launched container " + containerId +
- " on node: " + node);
+ LOG.info("Unknown application "
+ + containerId.getApplicationAttemptId().getApplicationId()
+ + " launched container " + containerId + " on node: " + node);
return;
}
@@ -1058,28 +1075,34 @@ public class FairScheduler implements Re
}
public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
- return appAttempts.get(appAttemptId);
+ SchedulerApplication app =
+ applications.get(appAttemptId.getApplicationId());
+ if (app != null) {
+ return (FSSchedulerApp) app.getCurrentAppAttempt();
+ }
+ return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
- if (!appAttempts.containsKey(appAttemptId)) {
+ FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
+ if (attempt == null) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
- return new SchedulerAppReport(appAttempts.get(appAttemptId));
+ return new SchedulerAppReport(attempt);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
- FSSchedulerApp app = appAttempts.get(appAttemptId);
- if (app == null) {
+ FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
+ if (attempt == null) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
- return app.getResourceUsageReport();
+ return attempt.getResourceUsageReport();
}
/**
@@ -1145,7 +1168,8 @@ public class FairScheduler implements Re
}
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
- addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
break;
case APP_ATTEMPT_REMOVED:
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@@ -1153,8 +1177,10 @@ public class FairScheduler implements Re
}
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
(AppAttemptRemovedSchedulerEvent) event;
- removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
- appAttemptRemovedEvent.getFinalAttemptState());
+ removeApplicationAttempt(
+ appAttemptRemovedEvent.getApplicationAttemptID(),
+ appAttemptRemovedEvent.getFinalAttemptState(),
+ appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
break;
case CONTAINER_EXPIRED:
if (!(event instanceof ContainerExpiredSchedulerEvent)) {
@@ -1205,6 +1231,9 @@ public class FairScheduler implements Re
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
+ // This stores per-application scheduling information
+ this.applications =
+ new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
@@ -1327,5 +1356,4 @@ public class FairScheduler implements Re
queue.collectSchedulerApplications(apps);
return apps;
}
-
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Wed Jan 15 06:06:31 2014
@@ -85,9 +85,7 @@ public class QueueManager {
* could be referred to as just "parent1.queue2".
*/
public FSLeafQueue getLeafQueue(String name, boolean create) {
- if (!name.startsWith(ROOT_QUEUE + ".")) {
- name = ROOT_QUEUE + "." + name;
- }
+ name = ensureRootPrefix(name);
synchronized (queues) {
FSQueue queue = queues.get(name);
if (queue == null && create) {
@@ -175,12 +173,106 @@ public class QueueManager {
}
/**
+ * Make way for the given leaf queue if possible, by removing incompatible
+ * queues with no apps in them. Incompatibility could be due to
+ * (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in
+ * the ancestry of leafToCreate.
+ *
+ * We will never remove the root queue or the default queue in this way.
+ *
+ * @return true if we can create leafToCreate or it already exists.
+ */
+ private boolean removeEmptyIncompatibleQueues(String leafToCreate) {
+ leafToCreate = ensureRootPrefix(leafToCreate);
+
+ // Ensure leafToCreate is not root and doesn't have the default queue in its
+ // ancestry.
+ if (leafToCreate.equals(ROOT_QUEUE) ||
+ leafToCreate.startsWith(
+ ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
+ return false;
+ }
+
+ FSQueue queue = queues.get(leafToCreate);
+ // Queue exists already.
+ if (queue != null) {
+ if (queue instanceof FSLeafQueue) {
+ // If it's an already existing leaf, we're ok.
+ return true;
+ } else {
+ // If it's an existing parent queue, remove it if it's empty.
+ return removeQueueIfEmpty(queue);
+ }
+ }
+
+ // Queue doesn't exist already. Check if the new queue would be created
+ // under an existing leaf queue. If so, try removing that leaf queue.
+ int sepIndex = leafToCreate.length();
+ sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
+ while (sepIndex != -1) {
+ String prefixString = leafToCreate.substring(0, sepIndex);
+ FSQueue prefixQueue = queues.get(prefixString);
+ if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
+ return removeQueueIfEmpty(prefixQueue);
+ }
+ sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
+ }
+ return true;
+ }
+
+ /**
+ * Remove the queue if it and its descendents are all empty.
+ * @param queue
+ * @return true if removed, false otherwise
+ */
+ private boolean removeQueueIfEmpty(FSQueue queue) {
+ if (isEmpty(queue)) {
+ removeQueue(queue);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Remove a queue and all its descendents.
+ */
+ private void removeQueue(FSQueue queue) {
+ if (queue instanceof FSLeafQueue) {
+ leafQueues.remove(queue);
+ } else {
+ List<FSQueue> childQueues = queue.getChildQueues();
+ while (!childQueues.isEmpty()) {
+ removeQueue(childQueues.get(0));
+ }
+ }
+ queues.remove(queue.getName());
+ queue.getParent().getChildQueues().remove(queue);
+ }
+
+ /**
+ * Returns true if there are no applications, running or not, in the given
+ * queue or any of its descendents.
+ */
+ protected boolean isEmpty(FSQueue queue) {
+ if (queue instanceof FSLeafQueue) {
+ FSLeafQueue leafQueue = (FSLeafQueue)queue;
+ return queue.getNumRunnableApps() == 0 &&
+ leafQueue.getNonRunnableAppSchedulables().isEmpty();
+ } else {
+ for (FSQueue child : queue.getChildQueues()) {
+ if (!isEmpty(child)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
* Gets a queue by name.
*/
public FSQueue getQueue(String name) {
- if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
- name = ROOT_QUEUE + "." + name;
- }
+ name = ensureRootPrefix(name);
synchronized (queues) {
return queues.get(name);
}
@@ -190,9 +282,7 @@ public class QueueManager {
* Return whether a queue exists already.
*/
public boolean exists(String name) {
- if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
- name = ROOT_QUEUE + "." + name;
- }
+ name = ensureRootPrefix(name);
synchronized (queues) {
return queues.containsKey(name);
}
@@ -214,10 +304,19 @@ public class QueueManager {
return queues.values();
}
+ private String ensureRootPrefix(String name) {
+ if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
+ name = ROOT_QUEUE + "." + name;
+ }
+ return name;
+ }
+
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
// Make sure all queues exist
for (String name : queueConf.getQueueNames()) {
- getLeafQueue(name, true);
+ if (removeEmptyIncompatibleQueues(name)) {
+ getLeafQueue(name, true);
+ }
}
for (FSQueue queue : queues.values()) {
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Jan 15 06:06:31 2014
@@ -67,9 +67,11 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -103,7 +105,8 @@ import com.google.common.annotations.Vis
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
-public class FifoScheduler implements ResourceScheduler, Configurable {
+public class FifoScheduler extends AbstractYarnScheduler implements
+ ResourceScheduler, Configurable {
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@@ -114,7 +117,6 @@ public class FifoScheduler implements Re
private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
- private RMContext rmContext;
protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
@@ -123,15 +125,6 @@ public class FifoScheduler implements Re
private Resource maximumAllocation;
private boolean usePortForNodeName;
- @VisibleForTesting
- protected Map<ApplicationId, SchedulerApplication> applications =
- new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
-
- // Use ConcurrentSkipListMap because applications need to be ordered
- @VisibleForTesting
- protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts
- = new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
-
private ActiveUsersManager activeUsersManager;
private static final String DEFAULT_QUEUE_NAME = "default";
@@ -246,6 +239,9 @@ public class FifoScheduler implements Re
if (!this.initialized) {
validateConf(conf);
this.rmContext = rmContext;
+ //Use ConcurrentSkipListMap because applications need to be ordered
+ this.applications =
+ new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
this.minimumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@@ -270,7 +266,7 @@ public class FifoScheduler implements Re
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@@ -336,22 +332,26 @@ public class FifoScheduler implements Re
}
@VisibleForTesting
- FiCaSchedulerApp getApplication(
- ApplicationAttemptId applicationAttemptId) {
- return appAttempts.get(applicationAttemptId);
+ FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
+ SchedulerApplication app =
+ applications.get(applicationAttemptId.getApplicationId());
+ if (app != null) {
+ return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+ }
+ return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplication(applicationAttemptId);
+ FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplication(applicationAttemptId);
+ FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
}
@@ -362,15 +362,18 @@ public class FifoScheduler implements Re
private synchronized void addApplication(ApplicationId applicationId,
String queue, String user) {
SchedulerApplication application =
- new SchedulerApplication(null, user);
+ new SchedulerApplication(DEFAULT_QUEUE, user);
applications.put(applicationId, application);
- LOG.info("Accepted application " + applicationId + " from user: " + user);
+ metrics.submitApp(user);
+ LOG.info("Accepted application " + applicationId + " from user: " + user
+ + ", currently num of applications: " + applications.size());
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
- private synchronized void addApplicationAttempt(
- ApplicationAttemptId appAttemptId) {
+ private synchronized void
+ addApplicationAttempt(ApplicationAttemptId appAttemptId,
+ boolean transferStateFromPreviousAttempt) {
SchedulerApplication application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
@@ -378,11 +381,16 @@ public class FifoScheduler implements Re
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
activeUsersManager, this.rmContext);
- appAttempts.put(appAttemptId, schedulerApp);
- metrics.submitApp(user, appAttemptId.getAttemptId());
+
+ if (transferStateFromPreviousAttempt) {
+ schedulerApp.transferStateFromPreviousAttempt(application
+ .getCurrentAppAttempt());
+ }
+ application.setCurrentAppAttempt(schedulerApp);
+
+ metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId
- + " to scheduler from user " + application.getUser()
- + ", currently active: " + appAttempts.size());
+ + " to scheduler from user " + application.getUser());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
@@ -391,37 +399,47 @@ public class FifoScheduler implements Re
private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) {
SchedulerApplication application = applications.get(applicationId);
+ if (application == null){
+ LOG.warn("Couldn't find application " + applicationId);
+ return;
+ }
// Inform the activeUsersManager
activeUsersManager.deactivateApplication(application.getUser(),
applicationId);
+ application.stop(finalState);
applications.remove(applicationId);
}
private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- RMAppAttemptState rmAppAttemptFinalState)
+ RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
throws IOException {
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
- if (application == null) {
+ FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
+ SchedulerApplication application =
+ applications.get(applicationAttemptId.getApplicationId());
+ if (application == null || attempt == null) {
throw new IOException("Unknown application " + applicationAttemptId +
" has completed!");
}
// Kill all 'live' containers
- for (RMContainer container : application.getLiveContainers()) {
- containerCompleted(container,
- SchedulerUtils.createAbnormalContainerStatus(
- container.getContainerId(),
- SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
+ for (RMContainer container : attempt.getLiveContainers()) {
+ if (keepContainers
+ && container.getState().equals(RMContainerState.RUNNING)) {
+ // do not kill the running container in the case of work-preserving AM
+ // restart.
+ LOG.info("Skip killing " + container.getContainerId());
+ continue;
+ }
+ containerCompleted(container,
+ SchedulerUtils.createAbnormalContainerStatus(
+ container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
+ RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
- application.stop(rmAppAttemptFinalState);
-
- // Remove the application
- appAttempts.remove(applicationAttemptId);
+ attempt.stop(rmAppAttemptFinalState);
}
/**
@@ -432,12 +450,13 @@ public class FifoScheduler implements Re
private void assignContainers(FiCaSchedulerNode node) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
- " #applications=" + appAttempts.size());
+ " #applications=" + applications.size());
// Try to assign containers to applications in fifo order
- for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : appAttempts
+ for (Map.Entry<ApplicationId, SchedulerApplication> e : applications
.entrySet()) {
- FiCaSchedulerApp application = e.getValue();
+ FiCaSchedulerApp application =
+ (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
@@ -474,8 +493,10 @@ public class FifoScheduler implements Re
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
- for (FiCaSchedulerApp application : appAttempts.values()) {
- application.setHeadroom(Resources.subtract(clusterResource, usedResource));
+ for (SchedulerApplication application : applications.values()) {
+ FiCaSchedulerApp attempt =
+ (FiCaSchedulerApp) application.getCurrentAppAttempt();
+ attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
}
}
@@ -744,7 +765,8 @@ public class FifoScheduler implements Re
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
- addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
}
break;
case APP_ATTEMPT_REMOVED:
@@ -754,7 +776,8 @@ public class FifoScheduler implements Re
try {
doneApplicationAttempt(
appAttemptRemovedEvent.getApplicationAttemptID(),
- appAttemptRemovedEvent.getFinalAttemptState());
+ appAttemptRemovedEvent.getFinalAttemptState(),
+ appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
} catch(IOException ie) {
LOG.error("Unable to remove application "
+ appAttemptRemovedEvent.getApplicationAttemptID(), ie);
@@ -780,12 +803,11 @@ public class FifoScheduler implements Re
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
- ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
- LOG.info("Unknown application: " + applicationAttemptId +
- " launched container " + containerId +
- " on node: " + node);
+ LOG.info("Unknown application "
+ + containerId.getApplicationAttemptId().getApplicationId()
+ + " launched container " + containerId + " on node: " + node);
// Some unknown container sneaked into the system. Kill it.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
@@ -806,14 +828,16 @@ public class FifoScheduler implements Re
// Get the application for the finished container
Container container = rmContainer.getContainer();
- ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application =
+ getCurrentAttemptForContainer(container.getId());
+ ApplicationId appId =
+ container.getId().getApplicationAttemptId().getApplicationId();
// Get the node on which the container was allocated
FiCaSchedulerNode node = getNode(container.getNodeId());
if (application == null) {
- LOG.info("Unknown application: " + applicationAttemptId +
+ LOG.info("Unknown application: " + appId +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
@@ -829,7 +853,7 @@ public class FifoScheduler implements Re
// Update total usage
Resources.subtractFrom(usedResource, container.getResource());
- LOG.info("Application " + applicationAttemptId +
+ LOG.info("Application attempt " + application.getApplicationAttemptId() +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
@@ -887,11 +911,22 @@ public class FifoScheduler implements Re
FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
-
- private RMContainer getRMContainer(ContainerId containerId) {
- FiCaSchedulerApp application =
- getApplication(containerId.getApplicationAttemptId());
- return (application == null) ? null : application.getRMContainer(containerId);
+
+ @Override
+ public RMContainer getRMContainer(ContainerId containerId) {
+ FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+ return (attempt == null) ? null : attempt.getRMContainer(containerId);
+ }
+
+ private FiCaSchedulerApp getCurrentAttemptForContainer(
+ ContainerId containerId) {
+ SchedulerApplication app =
+ applications.get(containerId.getApplicationAttemptId()
+ .getApplicationId());
+ if (app != null) {
+ return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+ }
+ return null;
}
@Override
@@ -908,12 +943,12 @@ public class FifoScheduler implements Re
@Override
public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
- List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
- appAttempts.size());
- for (FiCaSchedulerApp app : appAttempts.values()) {
- apps.add(app.getApplicationAttemptId());
+ List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(
+ applications.size());
+ for (SchedulerApplication app : applications.values()) {
+ attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
}
- return apps;
+ return attempts;
} else {
return null;
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java Wed Jan 15 06:06:31 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
@@ -53,6 +54,9 @@ public class RMPolicyProvider extends Po
new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL,
ContainerManagementProtocolPB.class),
+ new Service(
+ CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
+ HAServiceProtocol.class),
};
@Override
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AboutBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AboutBlock.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AboutBlock.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AboutBlock.java Wed Jan 15 06:06:31 2014
@@ -43,6 +43,7 @@ public class AboutBlock extends HtmlBloc
info("Cluster overview").
_("Cluster ID:", cinfo.getClusterId()).
_("ResourceManager state:", cinfo.getState()).
+ _("ResourceManager HA state:", cinfo.getHAState()).
_("ResourceManager started on:", Times.format(cinfo.getStartedOn())).
_("ResourceManager version:", cinfo.getRMBuildVersion() +
" on " + cinfo.getRMVersionBuiltOn()).
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java Wed Jan 15 06:06:31 2014
@@ -21,6 +21,7 @@ import javax.xml.bind.annotation.XmlAcce
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -33,6 +34,7 @@ public class ClusterInfo {
protected long id;
protected long startedOn;
protected STATE state;
+ protected HAServiceProtocol.HAServiceState haState;
protected String resourceManagerVersion;
protected String resourceManagerBuildVersion;
protected String resourceManagerVersionBuiltOn;
@@ -48,6 +50,7 @@ public class ClusterInfo {
this.id = ts;
this.state = rm.getServiceState();
+ this.haState = rm.getRMContext().getHAServiceState();
this.startedOn = ts;
this.resourceManagerVersion = YarnVersionInfo.getVersion();
this.resourceManagerBuildVersion = YarnVersionInfo.getBuildVersion();
@@ -61,6 +64,10 @@ public class ClusterInfo {
return this.state.toString();
}
+ public String getHAState() {
+ return this.haState.toString();
+ }
+
public String getRMVersion() {
return this.resourceManagerVersion;
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Wed Jan 15 06:06:31 2014
@@ -171,7 +171,7 @@ public class Application {
new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
scheduler.handle(addAppEvent);
AppAttemptAddedSchedulerEvent addAttemptEvent =
- new AppAttemptAddedSchedulerEvent(this.applicationAttemptId);
+ new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false);
scheduler.handle(addAttemptEvent);
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Wed Jan 15 06:06:31 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
+import org.mortbay.log.Log;
public class MockNM {
@@ -130,12 +131,13 @@ public class MockNM {
int containerId, ContainerState containerState) throws Exception {
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
new HashMap<ApplicationId, List<ContainerStatus>>(1);
- ContainerStatus amContainerStatus = BuilderUtils.newContainerStatus(
- BuilderUtils.newContainerId(attemptId, 1),
- ContainerState.COMPLETE, "Success", 0);
+ ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+ BuilderUtils.newContainerId(attemptId, containerId), containerState,
+ "Success", 0);
ArrayList<ContainerStatus> containerStatusList =
new ArrayList<ContainerStatus>(1);
- containerStatusList.add(amContainerStatus);
+ containerStatusList.add(containerStatus);
+ Log.info("ContainerStatus: " + containerStatus);
nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
return nodeHeartbeat(nodeUpdate, true);
}
@@ -152,6 +154,7 @@ public class MockNM {
status.setResponseId(resId);
status.setNodeId(nodeId);
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
+ Log.info("entry.getValue() " + entry.getValue());
status.setContainersStatuses(entry.getValue());
}
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Jan 15 06:06:31 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -40,7 +41,10 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -56,6 +60,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -122,6 +128,33 @@ public class MockRM extends ResourceMana
attempt.getAppAttemptState());
}
+ public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
+ throws Exception {
+ int timeoutSecs = 0;
+ while (getResourceScheduler().getRMContainer(containerId) == null
+ && timeoutSecs++ < 40) {
+ System.out.println("Waiting for" + containerId + " to be allocated.");
+ nm.nodeHeartbeat(true);
+ Thread.sleep(200);
+ }
+ }
+
+ public void waitForState(MockNM nm, ContainerId containerId,
+ RMContainerState containerState) throws Exception {
+ RMContainer container = getResourceScheduler().getRMContainer(containerId);
+ Assert.assertNotNull("Container shouldn't be null", container);
+ int timeoutSecs = 0;
+ while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) {
+ System.out.println("Container : " + containerId + " State is : "
+ + container.getState() + " Waiting for state : " + containerState);
+ nm.nodeHeartbeat(true);
+ Thread.sleep(300);
+ }
+ System.out.println("Container State is : " + container.getState());
+ Assert.assertEquals("Container state is not correct (timedout)",
+ containerState, container.getState());
+ }
+
// get new application id
public GetNewApplicationResponse getNewAppId() throws Exception {
ApplicationClientProtocol client = getClientRMService();
@@ -172,7 +205,17 @@ public class MockRM extends ResourceMana
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
- boolean waitForAccepted) throws Exception {
+ boolean waitForAccepted)
+ throws Exception {
+ return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, waitForAccepted, false);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers)
+ throws Exception {
ApplicationClientProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@@ -182,6 +225,7 @@ public class MockRM extends ResourceMana
.newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext sub = Records
.newRecord(ApplicationSubmissionContext.class);
+ sub.setKeepContainersAcrossApplicationAttempts(keepContainers);
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
@@ -421,4 +465,26 @@ public class MockRM extends ResourceMana
// override to disable webapp
}
+ public static void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
+ MockAM am) throws Exception {
+ FinishApplicationMasterRequest req =
+ FinishApplicationMasterRequest.newInstance(
+ FinalApplicationStatus.SUCCEEDED, "", "");
+ am.unregisterAppAttempt(req);
+ am.waitForState(RMAppAttemptState.FINISHING);
+ nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am.waitForState(RMAppAttemptState.FINISHED);
+ rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
+ }
+
+ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+ throws Exception {
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ nm.nodeHeartbeat(true);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+ rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+ return am;
+ }
+
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Wed Jan 15 06:06:31 2014
@@ -649,7 +649,7 @@ public class TestClientRMService {
.currentTimeMillis(), "YARN"));
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1);
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
- rmContext, yarnScheduler, null, asContext, config);
+ rmContext, yarnScheduler, null, asContext, config, false);
when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
return app;
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Wed Jan 15 06:06:31 2014
@@ -302,7 +302,7 @@ public class TestFifoScheduler {
new AppAddedSchedulerEvent(appId1, "queue", "user");
fs.handle(appEvent);
SchedulerEvent attemptEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId1);
+ new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
fs.handle(attemptEvent);
List<ContainerId> emptyId = new ArrayList<ContainerId>();
@@ -396,7 +396,7 @@ public class TestFifoScheduler {
new AppAddedSchedulerEvent(appId1, "queue", "user");
fs.handle(appEvent);
SchedulerEvent attemptEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId1);
+ new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
fs.handle(attemptEvent);
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
@@ -406,7 +406,7 @@ public class TestFifoScheduler {
new AppAddedSchedulerEvent(appId2, "queue", "user");
fs.handle(appEvent2);
SchedulerEvent attemptEvent2 =
- new AppAttemptAddedSchedulerEvent(appAttemptId2);
+ new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
fs.handle(attemptEvent2);
List<ContainerId> emptyId = new ArrayList<ContainerId>();
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Wed Jan 15 06:06:31 2014
@@ -28,7 +28,6 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
@@ -38,7 +37,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
@@ -295,6 +293,8 @@ public class TestRM {
nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
ContainerState.COMPLETE);
}
+ nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1,
+ ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
Assert.assertFalse(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
@@ -389,19 +389,19 @@ public class TestRM {
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
- MockAM am1 = launchAM(app1, rm1, nm1);
- finishApplicationMaster(app1, rm1, nm1, am1);
+ MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
+ MockRM.finishApplicationMaster(app1, rm1, nm1, am1);
// a failed app
RMApp app2 = rm1.submitApp(200);
- MockAM am2 = launchAM(app2, rm1, nm1);
+ MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
// a killed app
RMApp app3 = rm1.submitApp(200);
- MockAM am3 = launchAM(app3, rm1, nm1);
+ MockAM am3 = MockRM.launchAM(app3, rm1, nm1);
rm1.killApp(app3.getApplicationId());
rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
@@ -441,7 +441,7 @@ public class TestRM {
// a failed app
RMApp app2 = rm1.submitApp(200);
- MockAM am2 = launchAM(app2, rm1, nm1);
+ MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
nm1
.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
@@ -458,28 +458,6 @@ public class TestRM {
Assert.assertEquals(-1, report1.getRpcPort());
}
- private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
- throws Exception {
- RMAppAttempt attempt = app.getCurrentAppAttempt();
- nm.nodeHeartbeat(true);
- MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
- am.registerAppAttempt();
- rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
- return am;
- }
-
- private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
- MockAM am) throws Exception {
- FinishApplicationMasterRequest req =
- FinishApplicationMasterRequest.newInstance(
- FinalApplicationStatus.SUCCEEDED, "", "");
- am.unregisterAppAttempt(req);
- am.waitForState(RMAppAttemptState.FINISHING);
- nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
- am.waitForState(RMAppAttemptState.FINISHED);
- rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
- }
-
public static void main(String[] args) throws Exception {
TestRM t = new TestRM();
t.testGetNewAppId();
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Wed Jan 15 06:06:31 2014
@@ -26,8 +26,11 @@ import org.apache.hadoop.ha.HAServicePro
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Before;
import org.junit.Test;
@@ -222,4 +225,81 @@ public class TestRMHA {
checkMonitorHealth();
checkActiveRMFunctionality();
}
+
+ @Test
+ public void testRMDispatcherForHA() throws IOException {
+ String errorMessageForEventHandler =
+ "Expect to get the same number of handlers";
+ String errorMessageForService = "Expect to get the same number of services";
+ Configuration conf = new YarnConfiguration(configuration);
+ rm = new MockRM(conf) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return new MyCountingDispatcher();
+ }
+ };
+ rm.init(conf);
+ int expectedEventHandlerCount =
+ ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
+ .getEventHandlerCount();
+ int expectedServiceCount = rm.getServices().size();
+ assertTrue(expectedEventHandlerCount != 0);
+
+ StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+ assertEquals(STATE_ERR, HAServiceState.INITIALIZING,
+ rm.adminService.getServiceStatus().getState());
+ assertFalse("RM is ready to become active before being started",
+ rm.adminService.getServiceStatus().isReadyToBecomeActive());
+ rm.start();
+
+ //call transitions to standby and active a couple of times
+ rm.adminService.transitionToStandby(requestInfo);
+ rm.adminService.transitionToActive(requestInfo);
+ rm.adminService.transitionToStandby(requestInfo);
+ rm.adminService.transitionToActive(requestInfo);
+ rm.adminService.transitionToStandby(requestInfo);
+
+ rm.adminService.transitionToActive(requestInfo);
+ assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
+ ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
+ .getEventHandlerCount());
+ assertEquals(errorMessageForService, expectedServiceCount,
+ rm.getServices().size());
+
+ rm.adminService.transitionToStandby(requestInfo);
+ assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
+ ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
+ .getEventHandlerCount());
+ assertEquals(errorMessageForService, expectedServiceCount,
+ rm.getServices().size());
+
+ rm.stop();
+ }
+
+ @SuppressWarnings("rawtypes")
+ class MyCountingDispatcher extends AbstractService implements Dispatcher {
+
+ private int eventHandlerCount;
+
+ public MyCountingDispatcher() {
+ super("MyCountingDispatcher");
+ this.eventHandlerCount = 0;
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return null;
+ }
+
+ @Override
+ public void register(Class<? extends Enum> eventType, EventHandler handler) {
+ this.eventHandlerCount ++;
+ }
+
+ public int getEventHandlerCount() {
+ return this.eventHandlerCount;
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Wed Jan 15 06:06:31 2014
@@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
@@ -179,7 +180,7 @@ public class TestRMRestart {
am1.registerAppAttempt();
// AM request for containers
- am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
+ am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
// kick the scheduler
nm1.nodeHeartbeat(true);
List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
@@ -1543,6 +1544,128 @@ public class TestRMRestart {
Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
}
+ @SuppressWarnings("resource")
+ @Test
+ public void testQueueMetricsOnRMRestart() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // PHASE 1: create state in an RM
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ QueueMetrics qm1 = rm1.getResourceScheduler().getRootQueueMetrics();
+ resetQueueMetrics(qm1);
+ assertQueueMetrics(qm1, 0, 0, 0, 0);
+
+ // create app that gets launched and does allocate before RM restart
+ RMApp app1 = rm1.submitApp(200);
+ assertQueueMetrics(qm1, 1, 1, 0, 0);
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
+ rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+ MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+ am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
+ nm1.nodeHeartbeat(true);
+ List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers();
+ while (conts.size() == 0) {
+ nm1.nodeHeartbeat(true);
+ conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers());
+ Thread.sleep(500);
+ }
+ assertQueueMetrics(qm1, 1, 0, 1, 0);
+
+ // PHASE 2: create new RM and start from old state
+ // create new RM to represent restart and recover state
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
+ resetQueueMetrics(qm2);
+ assertQueueMetrics(qm2, 0, 0, 0, 0);
+ // recover app
+ RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+ am1.setAMRMProtocol(rm2.getApplicationMasterService());
+ am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
+ nm1.nodeHeartbeat(true);
+ nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
+ List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
+ ContainerStatus containerStatus =
+ BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
+ .getCurrentAppAttempt().getAppAttemptId(), 1),
+ ContainerState.COMPLETE, "Killed AM container", 143);
+ containerStatuses.add(containerStatus);
+ nm1.registerNode(containerStatuses);
+ int timeoutSecs = 0;
+ while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {;
+ Thread.sleep(200);
+ }
+
+ assertQueueMetrics(qm2, 1, 1, 0, 0);
+ nm1.nodeHeartbeat(true);
+ attempt1 = loadedApp1.getCurrentAppAttempt();
+ attemptId1 = attempt1.getAppAttemptId();
+ rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+ assertQueueMetrics(qm2, 1, 0, 1, 0);
+ am1 = rm2.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+ am1.allocate("127.0.0.1" , 1000, 3, new ArrayList<ContainerId>());
+ nm1.nodeHeartbeat(true);
+ conts = am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers();
+ while (conts.size() == 0) {
+ nm1.nodeHeartbeat(true);
+ conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers());
+ Thread.sleep(500);
+ }
+
+ // finish the AMs
+ finishApplicationMaster(loadedApp1, rm2, nm1, am1);
+ assertQueueMetrics(qm2, 1, 0, 0, 1);
+
+ // stop RM's
+ rm2.stop();
+ rm1.stop();
+ }
+
+
+ // The metrics has some carry-on value from the previous RM, because the
+ // test case is in-memory, for the same queue name (e.g. root), there's
+ // always a singleton QueueMetrics object.
+ private int appsSubmittedCarryOn = 0;
+ private int appsPendingCarryOn = 0;
+ private int appsRunningCarryOn = 0;
+ private int appsCompletedCarryOn = 0;
+
+ private void resetQueueMetrics(QueueMetrics qm) {
+ appsSubmittedCarryOn = qm.getAppsSubmitted();
+ appsPendingCarryOn = qm.getAppsPending();
+ appsRunningCarryOn = qm.getAppsRunning();
+ appsCompletedCarryOn = qm.getAppsCompleted();
+ }
+
+ private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted,
+ int appsPending, int appsRunning, int appsCompleted) {
+ Assert.assertEquals(qm.getAppsSubmitted(),
+ appsSubmitted + appsSubmittedCarryOn);
+ Assert.assertEquals(qm.getAppsPending(),
+ appsPending + appsPendingCarryOn);
+ Assert.assertEquals(qm.getAppsRunning(),
+ appsRunning + appsRunningCarryOn);
+ Assert.assertEquals(qm.getAppsCompleted(),
+ appsCompleted + appsCompletedCarryOn);
+ }
+
public class TestMemoryRMStateStore extends MemoryRMStateStore {
int count = 0;
public int updateApp = 0;
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Wed Jan 15 06:06:31 2014
@@ -164,7 +164,7 @@ public class TestResourceManager {
// Notify scheduler application is finished.
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(
- application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
+ application.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
resourceManager.getResourceScheduler().handle(appRemovedEvent1);
checkResourceUsage(nm1, nm2);