You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/01/10 16:05:19 UTC
svn commit: r1557144 [2/3] - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-api/src/main/proto/
hadoop-yarn/hadoop-yarn-common/src/main/j...
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Jan 10 15:05:18 2014
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -123,15 +124,11 @@ public class FifoScheduler implements Re
private Resource maximumAllocation;
private boolean usePortForNodeName;
+ // Use ConcurrentSkipListMap because applications need to be ordered
@VisibleForTesting
protected Map<ApplicationId, SchedulerApplication> applications =
new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
- // Use ConcurrentSkipListMap because applications need to be ordered
- @VisibleForTesting
- protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts
- = new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
-
private ActiveUsersManager activeUsersManager;
private static final String DEFAULT_QUEUE_NAME = "default";
@@ -270,7 +267,7 @@ public class FifoScheduler implements Re
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@@ -336,22 +333,26 @@ public class FifoScheduler implements Re
}
@VisibleForTesting
- FiCaSchedulerApp getApplication(
- ApplicationAttemptId applicationAttemptId) {
- return appAttempts.get(applicationAttemptId);
+ FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
+ SchedulerApplication app =
+ applications.get(applicationAttemptId.getApplicationId());
+ if (app != null) {
+ return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+ }
+ return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplication(applicationAttemptId);
+ FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplication(applicationAttemptId);
+ FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
}
@@ -364,13 +365,15 @@ public class FifoScheduler implements Re
SchedulerApplication application =
new SchedulerApplication(null, user);
applications.put(applicationId, application);
- LOG.info("Accepted application " + applicationId + " from user: " + user);
+ LOG.info("Accepted application " + applicationId + " from user: " + user
+ + ", currently num of applications: " + applications.size());
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
- private synchronized void addApplicationAttempt(
- ApplicationAttemptId appAttemptId) {
+ private synchronized void
+ addApplicationAttempt(ApplicationAttemptId appAttemptId,
+ boolean transferStateFromPreviousAttempt) {
SchedulerApplication application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
@@ -378,11 +381,16 @@ public class FifoScheduler implements Re
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
activeUsersManager, this.rmContext);
- appAttempts.put(appAttemptId, schedulerApp);
+
+ if (transferStateFromPreviousAttempt) {
+ schedulerApp.transferStateFromPreviousAttempt(application
+ .getCurrentAppAttempt());
+ }
+ application.setCurrentAppAttempt(schedulerApp);
+
metrics.submitApp(user, appAttemptId.getAttemptId());
LOG.info("Added Application Attempt " + appAttemptId
- + " to scheduler from user " + application.getUser()
- + ", currently active: " + appAttempts.size());
+ + " to scheduler from user " + application.getUser());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
@@ -400,28 +408,33 @@ public class FifoScheduler implements Re
private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- RMAppAttemptState rmAppAttemptFinalState)
+ RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
throws IOException {
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
- if (application == null) {
+ FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
+ SchedulerApplication application =
+ applications.get(applicationAttemptId.getApplicationId());
+ if (application == null || attempt == null) {
throw new IOException("Unknown application " + applicationAttemptId +
" has completed!");
}
// Kill all 'live' containers
- for (RMContainer container : application.getLiveContainers()) {
- containerCompleted(container,
- SchedulerUtils.createAbnormalContainerStatus(
- container.getContainerId(),
- SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
+ for (RMContainer container : attempt.getLiveContainers()) {
+ if (keepContainers
+ && container.getState().equals(RMContainerState.RUNNING)) {
+ // do not kill the running container in the case of work-preserving AM
+ // restart.
+ LOG.info("Skip killing " + container.getContainerId());
+ continue;
+ }
+ containerCompleted(container,
+ SchedulerUtils.createAbnormalContainerStatus(
+ container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
+ RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
- application.stop(rmAppAttemptFinalState);
-
- // Remove the application
- appAttempts.remove(applicationAttemptId);
+ attempt.stop(rmAppAttemptFinalState);
}
/**
@@ -432,12 +445,13 @@ public class FifoScheduler implements Re
private void assignContainers(FiCaSchedulerNode node) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
- " #applications=" + appAttempts.size());
+ " #applications=" + applications.size());
// Try to assign containers to applications in fifo order
- for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : appAttempts
+ for (Map.Entry<ApplicationId, SchedulerApplication> e : applications
.entrySet()) {
- FiCaSchedulerApp application = e.getValue();
+ FiCaSchedulerApp application =
+ (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
@@ -474,8 +488,10 @@ public class FifoScheduler implements Re
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
- for (FiCaSchedulerApp application : appAttempts.values()) {
- application.setHeadroom(Resources.subtract(clusterResource, usedResource));
+ for (SchedulerApplication application : applications.values()) {
+ FiCaSchedulerApp attempt =
+ (FiCaSchedulerApp) application.getCurrentAppAttempt();
+ attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
}
}
@@ -744,7 +760,8 @@ public class FifoScheduler implements Re
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
- addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
}
break;
case APP_ATTEMPT_REMOVED:
@@ -754,7 +771,8 @@ public class FifoScheduler implements Re
try {
doneApplicationAttempt(
appAttemptRemovedEvent.getApplicationAttemptID(),
- appAttemptRemovedEvent.getFinalAttemptState());
+ appAttemptRemovedEvent.getFinalAttemptState(),
+ appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
} catch(IOException ie) {
LOG.error("Unable to remove application "
+ appAttemptRemovedEvent.getApplicationAttemptID(), ie);
@@ -780,12 +798,11 @@ public class FifoScheduler implements Re
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
- ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
- LOG.info("Unknown application: " + applicationAttemptId +
- " launched container " + containerId +
- " on node: " + node);
+ LOG.info("Unknown application "
+ + containerId.getApplicationAttemptId().getApplicationId()
+ + " launched container " + containerId + " on node: " + node);
// Some unknown container sneaked into the system. Kill it.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
@@ -806,14 +823,16 @@ public class FifoScheduler implements Re
// Get the application for the finished container
Container container = rmContainer.getContainer();
- ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
- FiCaSchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application =
+ getCurrentAttemptForContainer(container.getId());
+ ApplicationId appId =
+ container.getId().getApplicationAttemptId().getApplicationId();
// Get the node on which the container was allocated
FiCaSchedulerNode node = getNode(container.getNodeId());
if (application == null) {
- LOG.info("Unknown application: " + applicationAttemptId +
+ LOG.info("Unknown application: " + appId +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
@@ -829,7 +848,7 @@ public class FifoScheduler implements Re
// Update total usage
Resources.subtractFrom(usedResource, container.getResource());
- LOG.info("Application " + applicationAttemptId +
+ LOG.info("Application attempt " + application.getApplicationAttemptId() +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
@@ -887,11 +906,22 @@ public class FifoScheduler implements Re
FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
-
- private RMContainer getRMContainer(ContainerId containerId) {
- FiCaSchedulerApp application =
- getApplication(containerId.getApplicationAttemptId());
- return (application == null) ? null : application.getRMContainer(containerId);
+
+ @Override
+ public RMContainer getRMContainer(ContainerId containerId) {
+ FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+ return (attempt == null) ? null : attempt.getRMContainer(containerId);
+ }
+
+ private FiCaSchedulerApp getCurrentAttemptForContainer(
+ ContainerId containerId) {
+ SchedulerApplication app =
+ applications.get(containerId.getApplicationAttemptId()
+ .getApplicationId());
+ if (app != null) {
+ return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+ }
+ return null;
}
@Override
@@ -908,12 +938,12 @@ public class FifoScheduler implements Re
@Override
public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
- List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
- appAttempts.size());
- for (FiCaSchedulerApp app : appAttempts.values()) {
- apps.add(app.getApplicationAttemptId());
+ List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(
+ applications.size());
+ for (SchedulerApplication app : applications.values()) {
+ attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
}
- return apps;
+ return attempts;
} else {
return null;
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Fri Jan 10 15:05:18 2014
@@ -171,7 +171,7 @@ public class Application {
new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
scheduler.handle(addAppEvent);
AppAttemptAddedSchedulerEvent addAttemptEvent =
- new AppAttemptAddedSchedulerEvent(this.applicationAttemptId);
+ new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false);
scheduler.handle(addAttemptEvent);
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Fri Jan 10 15:05:18 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
+import org.mortbay.log.Log;
public class MockNM {
@@ -130,12 +131,13 @@ public class MockNM {
int containerId, ContainerState containerState) throws Exception {
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
new HashMap<ApplicationId, List<ContainerStatus>>(1);
- ContainerStatus amContainerStatus = BuilderUtils.newContainerStatus(
- BuilderUtils.newContainerId(attemptId, 1),
- ContainerState.COMPLETE, "Success", 0);
+ ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+ BuilderUtils.newContainerId(attemptId, containerId), containerState,
+ "Success", 0);
ArrayList<ContainerStatus> containerStatusList =
new ArrayList<ContainerStatus>(1);
- containerStatusList.add(amContainerStatus);
+ containerStatusList.add(containerStatus);
+ Log.info("ContainerStatus: " + containerStatus);
nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
return nodeHeartbeat(nodeUpdate, true);
}
@@ -152,6 +154,7 @@ public class MockNM {
status.setResponseId(resId);
status.setNodeId(nodeId);
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
+ Log.info("entry.getValue() " + entry.getValue());
status.setContainersStatuses(entry.getValue());
}
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Fri Jan 10 15:05:18 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -40,7 +41,10 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -56,6 +60,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -122,6 +128,33 @@ public class MockRM extends ResourceMana
attempt.getAppAttemptState());
}
+ public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
+ throws Exception {
+ int timeoutSecs = 0;
+ while (getResourceScheduler().getRMContainer(containerId) == null
+ && timeoutSecs++ < 40) {
+ System.out.println("Waiting for" + containerId + " to be allocated.");
+ nm.nodeHeartbeat(true);
+ Thread.sleep(200);
+ }
+ }
+
+ public void waitForState(MockNM nm, ContainerId containerId,
+ RMContainerState containerState) throws Exception {
+ RMContainer container = getResourceScheduler().getRMContainer(containerId);
+ Assert.assertNotNull("Container shouldn't be null", container);
+ int timeoutSecs = 0;
+ while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) {
+ System.out.println("Container : " + containerId + " State is : "
+ + container.getState() + " Waiting for state : " + containerState);
+ nm.nodeHeartbeat(true);
+ Thread.sleep(300);
+ }
+ System.out.println("Container State is : " + container.getState());
+ Assert.assertEquals("Container state is not correct (timedout)",
+ containerState, container.getState());
+ }
+
// get new application id
public GetNewApplicationResponse getNewAppId() throws Exception {
ApplicationClientProtocol client = getClientRMService();
@@ -172,7 +205,17 @@ public class MockRM extends ResourceMana
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
- boolean waitForAccepted) throws Exception {
+ boolean waitForAccepted)
+ throws Exception {
+ return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, waitForAccepted, false);
+ }
+
+ public RMApp submitApp(int masterMemory, String name, String user,
+ Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+ int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers)
+ throws Exception {
ApplicationClientProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@@ -182,6 +225,7 @@ public class MockRM extends ResourceMana
.newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext sub = Records
.newRecord(ApplicationSubmissionContext.class);
+ sub.setKeepContainersAcrossApplicationAttempts(keepContainers);
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
@@ -421,4 +465,26 @@ public class MockRM extends ResourceMana
// override to disable webapp
}
+ public static void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
+ MockAM am) throws Exception {
+ FinishApplicationMasterRequest req =
+ FinishApplicationMasterRequest.newInstance(
+ FinalApplicationStatus.SUCCEEDED, "", "");
+ am.unregisterAppAttempt(req);
+ am.waitForState(RMAppAttemptState.FINISHING);
+ nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am.waitForState(RMAppAttemptState.FINISHED);
+ rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
+ }
+
+ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+ throws Exception {
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ nm.nodeHeartbeat(true);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+ rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+ return am;
+ }
+
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Fri Jan 10 15:05:18 2014
@@ -649,7 +649,7 @@ public class TestClientRMService {
.currentTimeMillis(), "YARN"));
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1);
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
- rmContext, yarnScheduler, null, asContext, config);
+ rmContext, yarnScheduler, null, asContext, config, false);
when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
return app;
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Fri Jan 10 15:05:18 2014
@@ -302,7 +302,7 @@ public class TestFifoScheduler {
new AppAddedSchedulerEvent(appId1, "queue", "user");
fs.handle(appEvent);
SchedulerEvent attemptEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId1);
+ new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
fs.handle(attemptEvent);
List<ContainerId> emptyId = new ArrayList<ContainerId>();
@@ -396,7 +396,7 @@ public class TestFifoScheduler {
new AppAddedSchedulerEvent(appId1, "queue", "user");
fs.handle(appEvent);
SchedulerEvent attemptEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId1);
+ new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
fs.handle(attemptEvent);
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
@@ -406,7 +406,7 @@ public class TestFifoScheduler {
new AppAddedSchedulerEvent(appId2, "queue", "user");
fs.handle(appEvent2);
SchedulerEvent attemptEvent2 =
- new AppAttemptAddedSchedulerEvent(appAttemptId2);
+ new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
fs.handle(attemptEvent2);
List<ContainerId> emptyId = new ArrayList<ContainerId>();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Fri Jan 10 15:05:18 2014
@@ -28,7 +28,6 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
@@ -38,7 +37,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
@@ -295,6 +293,8 @@ public class TestRM {
nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
ContainerState.COMPLETE);
}
+ nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1,
+ ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
Assert.assertFalse(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
@@ -389,19 +389,19 @@ public class TestRM {
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
- MockAM am1 = launchAM(app1, rm1, nm1);
- finishApplicationMaster(app1, rm1, nm1, am1);
+ MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
+ MockRM.finishApplicationMaster(app1, rm1, nm1, am1);
// a failed app
RMApp app2 = rm1.submitApp(200);
- MockAM am2 = launchAM(app2, rm1, nm1);
+ MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
// a killed app
RMApp app3 = rm1.submitApp(200);
- MockAM am3 = launchAM(app3, rm1, nm1);
+ MockAM am3 = MockRM.launchAM(app3, rm1, nm1);
rm1.killApp(app3.getApplicationId());
rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
@@ -441,7 +441,7 @@ public class TestRM {
// a failed app
RMApp app2 = rm1.submitApp(200);
- MockAM am2 = launchAM(app2, rm1, nm1);
+ MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
nm1
.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
@@ -458,28 +458,6 @@ public class TestRM {
Assert.assertEquals(-1, report1.getRpcPort());
}
- private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
- throws Exception {
- RMAppAttempt attempt = app.getCurrentAppAttempt();
- nm.nodeHeartbeat(true);
- MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
- am.registerAppAttempt();
- rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
- return am;
- }
-
- private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
- MockAM am) throws Exception {
- FinishApplicationMasterRequest req =
- FinishApplicationMasterRequest.newInstance(
- FinalApplicationStatus.SUCCEEDED, "", "");
- am.unregisterAppAttempt(req);
- am.waitForState(RMAppAttemptState.FINISHING);
- nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
- am.waitForState(RMAppAttemptState.FINISHED);
- rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
- }
-
public static void main(String[] args) throws Exception {
TestRM t = new TestRM();
t.testGetNewAppId();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Fri Jan 10 15:05:18 2014
@@ -164,7 +164,7 @@ public class TestResourceManager {
// Notify scheduler application is finished.
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(
- application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
+ application.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
resourceManager.getResourceScheduler().handle(appRemovedEvent1);
checkResourceUsage(nm1, nm2);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Fri Jan 10 15:05:18 2014
@@ -18,49 +18,30 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
-import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.Test;
/**
@@ -68,238 +49,164 @@ import org.junit.Test;
*
*/
public class TestAMRestart {
-// private static final Log LOG = LogFactory.getLog(TestAMRestart.class);
-// ApplicationsManagerImpl appImpl;
-// RMContext asmContext = new RMContextImpl(new MemStore());
-// ApplicationTokenSecretManager appTokenSecretManager =
-// new ApplicationTokenSecretManager();
-// DummyResourceScheduler scheduler;
-// private ClientRMService clientRMService;
-// int count = 0;
-// ApplicationId appID;
-// final int maxFailures = 3;
-// AtomicInteger launchNotify = new AtomicInteger();
-// AtomicInteger schedulerNotify = new AtomicInteger();
-// volatile boolean stop = false;
-// int schedulerAddApplication = 0;
-// int schedulerRemoveApplication = 0;
-// int launcherLaunchCalled = 0;
-// int launcherCleanupCalled = 0;
-// private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-//
-// private class ExtApplicationsManagerImpl extends ApplicationsManagerImpl {
-// public ExtApplicationsManagerImpl(
-// ApplicationTokenSecretManager applicationTokenSecretManager,
-// YarnScheduler scheduler, RMContext asmContext) {
-// super(applicationTokenSecretManager, scheduler, asmContext);
-// }
-//
-// @Override
-// public EventHandler<ASMEvent<AMLauncherEventType>> createNewApplicationMasterLauncher(
-// ApplicationTokenSecretManager tokenSecretManager) {
-// return new DummyAMLauncher();
-// }
-// }
-//
-// private class DummyAMLauncher implements EventHandler<ASMEvent<AMLauncherEventType>> {
-//
-// public DummyAMLauncher() {
-// asmContext.getDispatcher().register(AMLauncherEventType.class, this);
-// new Thread() {
-// public void run() {
-// while (!stop) {
-// LOG.info("DEBUG -- waiting for launch");
-// synchronized(launchNotify) {
-// while (launchNotify.get() == 0) {
-// try {
-// launchNotify.wait();
-// } catch (InterruptedException e) {
-// }
-// }
-// asmContext.getDispatcher().getEventHandler().handle(
-// new ApplicationEvent(
-// ApplicationEventType.LAUNCHED, appID));
-// launchNotify.addAndGet(-1);
-// }
-// }
-// }
-// }.start();
-// }
-//
-// @Override
-// public void handle(ASMEvent<AMLauncherEventType> event) {
-// switch (event.getType()) {
-// case CLEANUP:
-// launcherCleanupCalled++;
-// break;
-// case LAUNCH:
-// LOG.info("DEBUG -- launching");
-// launcherLaunchCalled++;
-// synchronized (launchNotify) {
-// launchNotify.addAndGet(1);
-// launchNotify.notify();
-// }
-// break;
-// default:
-// break;
-// }
-// }
-// }
-//
-// private class DummyResourceScheduler implements ResourceScheduler {
-//
-// @Override
-// public void removeNode(RMNode node) {
-// }
-//
-// @Override
-// public Allocation allocate(ApplicationId applicationId,
-// List<ResourceRequest> ask, List<Container> release) throws IOException {
-// Container container = recordFactory.newRecordInstance(Container.class);
-// container.setContainerToken(recordFactory.newRecordInstance(ContainerToken.class));
-// container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
-// container.setContainerManagerAddress("localhost");
-// container.setNodeHttpAddress("localhost:8042");
-// container.setId(recordFactory.newRecordInstance(ContainerId.class));
-// container.getId().setAppId(appID);
-// container.getId().setId(count);
-// count++;
-// return new Allocation(Arrays.asList(container), Resources.none());
-// }
-//
-// @Override
-// public void handle(ASMEvent<ApplicationTrackerEventType> event) {
-// switch (event.getType()) {
-// case ADD:
-// schedulerAddApplication++;
-// break;
-// case EXPIRE:
-// schedulerRemoveApplication++;
-// LOG.info("REMOVING app : " + schedulerRemoveApplication);
-// if (schedulerRemoveApplication == maxFailures) {
-// synchronized (schedulerNotify) {
-// schedulerNotify.addAndGet(1);
-// schedulerNotify.notify();
-// }
-// }
-// break;
-// default:
-// break;
-// }
-// }
-//
-// @Override
-// public QueueInfo getQueueInfo(String queueName,
-// boolean includeChildQueues,
-// boolean recursive) throws IOException {
-// return null;
-// }
-// @Override
-// public List<QueueUserACLInfo> getQueueUserAclInfo() {
-// return null;
-// }
-// @Override
-// public void addApplication(ApplicationId applicationId,
-// ApplicationMaster master, String user, String queue, Priority priority,
-// ApplicationStore store)
-// throws IOException {
-// }
-// @Override
-// public void addNode(RMNode nodeInfo) {
-// }
-// @Override
-// public void recover(RMState state) throws Exception {
-// }
-// @Override
-// public void reinitialize(Configuration conf,
-// ContainerTokenSecretManager secretManager, RMContext rmContext)
-// throws IOException {
-// }
-//
-// @Override
-// public void nodeUpdate(RMNode nodeInfo,
-// Map<String, List<Container>> containers) {
-// }
-//
-// @Override
-// public Resource getMaximumResourceCapability() {
-// // TODO Auto-generated method stub
-// return null;
-// }
-//
-// @Override
-// public Resource getMinimumResourceCapability() {
-// // TODO Auto-generated method stub
-// return null;
-// }
-// }
-//
-// @Before
-// public void setUp() {
-//
-// asmContext.getDispatcher().register(ApplicationEventType.class,
-// new ResourceManager.ApplicationEventDispatcher(asmContext));
-//
-// appID = recordFactory.newRecordInstance(ApplicationId.class);
-// appID.setClusterTimestamp(System.currentTimeMillis());
-// appID.setId(1);
-// Configuration conf = new Configuration();
-// scheduler = new DummyResourceScheduler();
-// asmContext.getDispatcher().init(conf);
-// asmContext.getDispatcher().start();
-// asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
-// appImpl = new ExtApplicationsManagerImpl(appTokenSecretManager, scheduler, asmContext);
-//
-// conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L);
-// conf.setInt(RMConfig.AM_MAX_RETRIES, maxFailures);
-// appImpl.init(conf);
-// appImpl.start();
-//
-// this.clientRMService = new ClientRMService(asmContext, appImpl
-// .getAmLivelinessMonitor(), appImpl.getClientToAMSecretManager(),
-// scheduler);
-// this.clientRMService.init(conf);
-// }
-//
-// @After
-// public void tearDown() {
-// }
-//
-// private void waitForFailed(AppAttempt application, ApplicationState
-// finalState) throws Exception {
-// int count = 0;
-// while(application.getState() != finalState && count < 10) {
-// Thread.sleep(500);
-// count++;
-// }
-// Assert.assertEquals(finalState, application.getState());
-// }
-//
-// @Test
-// public void testAMRestart() throws Exception {
-// ApplicationSubmissionContext subContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
-// subContext.setApplicationId(appID);
-// subContext.setApplicationName("dummyApp");
-//// subContext.command = new ArrayList<String>();
-//// subContext.environment = new HashMap<String, String>();
-//// subContext.fsTokens = new ArrayList<String>();
-// subContext.setFsTokensTodo(ByteBuffer.wrap(new byte[0]));
-// SubmitApplicationRequest request = recordFactory
-// .newRecordInstance(SubmitApplicationRequest.class);
-// request.setApplicationSubmissionContext(subContext);
-// clientRMService.submitApplication(request);
-// AppAttempt application = asmContext.getApplications().get(appID);
-// synchronized (schedulerNotify) {
-// while(schedulerNotify.get() == 0) {
-// schedulerNotify.wait();
-// }
-// }
-// Assert.assertEquals(maxFailures, launcherCleanupCalled);
-// Assert.assertEquals(maxFailures, launcherLaunchCalled);
-// Assert.assertEquals(maxFailures, schedulerAddApplication);
-// Assert.assertEquals(maxFailures, schedulerRemoveApplication);
-// Assert.assertEquals(maxFailures, application.getFailedCount());
-// waitForFailed(application, ApplicationState.FAILED);
-// stop = true;
-// }
+
+ @Test
+ public void testAMRestartWithExistingContainers() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ RMApp app1 =
+ rm1.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", -1,
+ null, "MAPREDUCE", false, true);
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ MockNM nm2 =
+ new MockNM("127.0.0.1:2351", 4089, rm1.getResourceTrackerService());
+ nm2.registerNode();
+
+ MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
+ int NUM_CONTAINERS = 3;
+ // allocate NUM_CONTAINERS containers
+ am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
+ new ArrayList<ContainerId>());
+ nm1.nodeHeartbeat(true);
+
+ // wait for containers to be allocated.
+ List<Container> containers =
+ am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers();
+ while (containers.size() != NUM_CONTAINERS) {
+ nm1.nodeHeartbeat(true);
+ containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers());
+ Thread.sleep(200);
+ }
+
+ // launch the 2nd container, for testing running container transferred.
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+ ContainerId containerId2 =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+
+ // launch the 3rd container, for testing container allocated by previous
+ // attempt is completed by the next new attempt/
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING);
+ ContainerId containerId3 =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 3);
+ rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
+
+ // 4th container still in AQUIRED state. for testing Acquired container is
+ // always killed.
+ ContainerId containerId4 =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 4);
+ rm1.waitForState(nm1, containerId4, RMContainerState.ACQUIRED);
+
+ // 5th container is in Allocated state. for testing allocated container is
+ // always killed.
+ am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+ nm1.nodeHeartbeat(true);
+ ContainerId containerId5 =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 5);
+ rm1.waitForContainerAllocated(nm1, containerId5);
+ rm1.waitForState(nm1, containerId5, RMContainerState.ALLOCATED);
+
+ // 6th container is in Reserved state.
+ am1.allocate("127.0.0.1", 6000, 1, new ArrayList<ContainerId>());
+ ContainerId containerId6 =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 6);
+ nm1.nodeHeartbeat(true);
+ SchedulerApplicationAttempt schedulerAttempt =
+ ((CapacityScheduler) rm1.getResourceScheduler())
+ .getCurrentAttemptForContainer(containerId6);
+ while (schedulerAttempt.getReservedContainers().size() == 0) {
+ System.out.println("Waiting for container " + containerId6
+ + " to be reserved.");
+ nm1.nodeHeartbeat(true);
+ Thread.sleep(200);
+ }
+ // assert containerId6 is reserved.
+ Assert.assertEquals(containerId6, schedulerAttempt.getReservedContainers()
+ .get(0).getContainerId());
+
+ // fail the AM by sending CONTAINER_FINISHED event without registering.
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am1.waitForState(RMAppAttemptState.FAILED);
+
+ // wait for some time. previous AM's running containers should still remain
+ // in scheduler even though am failed
+ Thread.sleep(3000);
+ rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+ // acquired/allocated containers are cleaned up.
+ Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId4));
+ Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId5));
+
+ // wait for app to start a new attempt.
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ // assert this is a new AM.
+ ApplicationAttemptId newAttemptId =
+ app1.getCurrentAppAttempt().getAppAttemptId();
+ Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
+ MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
+
+ // complete container by sending the container complete event which has earlier
+ // attempt's attemptId
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE);
+ rm1.waitForState(nm1, containerId3, RMContainerState.COMPLETED);
+
+ // Even though the completed container containerId3 event was sent to the
+ // earlier failed attempt, new RMAppAttempt can also capture this container
+ // info.
+ // completed containerId4 is also transferred to the new attempt.
+ RMAppAttempt newAttempt =
+ app1.getRMAppAttempt(am2.getApplicationAttemptId());
+ // 4 containers finished, acquired/allocated/reserved/completed.
+ Assert.assertEquals(4, newAttempt.getJustFinishedContainers().size());
+ boolean container3Exists = false, container4Exists = false, container5Exists =
+ false, container6Exists = false;
+ for(ContainerStatus status : newAttempt.getJustFinishedContainers()) {
+ if(status.getContainerId().equals(containerId3)) {
+ // containerId3 is the container ran by previous attempt but finished by the
+ // new attempt.
+ container3Exists = true;
+ }
+ if (status.getContainerId().equals(containerId4)) {
+ // containerId4 is the Acquired Container killed by the previous attempt,
+ // it's now inside new attempt's finished container list.
+ container4Exists = true;
+ }
+ if (status.getContainerId().equals(containerId5)) {
+ // containerId5 is the Allocated container killed by previous failed attempt.
+ container5Exists = true;
+ }
+ if (status.getContainerId().equals(containerId6)) {
+ // containerId6 is the reserved container killed by previous failed attempt.
+ container6Exists = true;
+ }
+ }
+ Assert.assertTrue(container3Exists && container4Exists && container5Exists
+ && container6Exists);
+
+ // New SchedulerApplicationAttempt also has the containers info.
+ rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+
+ // record the scheduler attempt for testing.
+ SchedulerApplicationAttempt schedulerNewAttempt =
+ ((CapacityScheduler) rm1.getResourceScheduler())
+ .getCurrentAttemptForContainer(containerId2);
+ // finish this application
+ MockRM.finishApplicationMaster(app1, rm1, nm1, am2);
+
+ // the 2nd attempt released the 1st attempt's running container, when the
+ // 2nd attempt finishes.
+ Assert.assertFalse(schedulerNewAttempt.getLiveContainers().contains(
+ containerId2));
+ // all 4 normal containers finished.
+ Assert.assertEquals(5, newAttempt.getJustFinishedContainers().size());
+
+ rm1.stop();
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Fri Jan 10 15:05:18 2014
@@ -460,7 +460,7 @@ public class TestRMAppTransitions {
LOG.info("--- START: testUnmanagedAppFailPath ---");
application = testCreateAppRunning(subContext);
RMAppEvent event = new RMAppFailedAttemptEvent(
- application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "");
+ application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false);
application.handle(event);
rmDispatcher.await();
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
@@ -582,7 +582,7 @@ public class TestRMAppTransitions {
for (int i=1; i < maxAppAttempts; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FAILED, "");
+ RMAppEventType.ATTEMPT_FAILED, "", false);
application.handle(event);
assertAppState(RMAppState.ACCEPTED, application);
event =
@@ -598,7 +598,7 @@ public class TestRMAppTransitions {
String message = "Test fail";
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FAILED, message);
+ RMAppEventType.ATTEMPT_FAILED, message, false);
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@@ -655,7 +655,7 @@ public class TestRMAppTransitions {
for (int i=1; i<maxAppAttempts; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FAILED, "");
+ RMAppEventType.ATTEMPT_FAILED, "", false);
application.handle(event);
rmDispatcher.await();
assertAppState(RMAppState.ACCEPTED, application);
@@ -680,7 +680,7 @@ public class TestRMAppTransitions {
// after max application attempts
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FAILED, "");
+ RMAppEventType.ATTEMPT_FAILED, "", false);
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@@ -804,7 +804,7 @@ public class TestRMAppTransitions {
// KILLED => KILLED event RMAppEventType.ATTEMPT_FAILED
event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FAILED, "");
+ RMAppEventType.ATTEMPT_FAILED, "", false);
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Fri Jan 10 15:05:18 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -68,10 +69,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@@ -120,14 +121,15 @@ public class TestRMAppAttemptTransitions
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore store;
- private RMApp application;
+ private RMAppImpl application;
private RMAppAttempt applicationAttempt;
private Configuration conf = new Configuration();
private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
spy(new ClientToAMTokenSecretManagerInRM());
-
+ private boolean transferStateFromPreviousAttempt = false;
+
private final class TestApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> {
@@ -150,6 +152,11 @@ public class TestRMAppAttemptTransitions
@Override
public void handle(RMAppEvent event) {
assertEquals(application.getApplicationId(), event.getApplicationId());
+ if (event instanceof RMAppFailedAttemptEvent) {
+ transferStateFromPreviousAttempt =
+ ((RMAppFailedAttemptEvent) event)
+ .getTransferStateFromPreviousAttempt();
+ }
try {
application.handle(event);
} catch (Throwable t) {
@@ -254,10 +261,10 @@ public class TestRMAppAttemptTransitions
unmanagedAM = false;
- application = mock(RMApp.class);
+ application = mock(RMAppImpl.class);
applicationAttempt =
new RMAppAttemptImpl(applicationAttemptId, rmContext, scheduler,
- masterService, submissionContext, new Configuration());
+ masterService, submissionContext, new Configuration(), false);
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
when(application.getApplicationId()).thenReturn(applicationId);
@@ -371,6 +378,7 @@ public class TestRMAppAttemptTransitions
assertNull(applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyAttemptFinalStateSaved();
+ assertFalse(transferStateFromPreviousAttempt);
}
/**
@@ -525,6 +533,7 @@ public class TestRMAppAttemptTransitions
assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
+ assertFalse(transferStateFromPreviousAttempt);
}
@@ -654,6 +663,7 @@ public class TestRMAppAttemptTransitions
diagnostics));
testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1,
true);
+ assertFalse(transferStateFromPreviousAttempt);
}
private void sendAttemptUpdateSavedEvent(RMAppAttempt applicationAttempt) {
@@ -682,6 +692,21 @@ public class TestRMAppAttemptTransitions
}
@Test
+ public void testUnmanagedAMContainersCleanup() {
+ unmanagedAM = true;
+ when(submissionContext.getUnmanagedAM()).thenReturn(true);
+ when(submissionContext.getKeepContainersAcrossApplicationAttempts())
+ .thenReturn(true);
+ // submit AM and check it goes to SUBMITTED state
+ submitApplicationAttempt();
+ // launch AM and verify attempt failed
+ applicationAttempt.handle(new RMAppAttemptRegistrationEvent(
+ applicationAttempt.getAppAttemptId(), "host", 8042, "oldtrackingurl"));
+ sendAttemptUpdateSavedEvent(applicationAttempt);
+ assertFalse(transferStateFromPreviousAttempt);
+ }
+
+ @Test
public void testNewToKilled() {
applicationAttempt.handle(
new RMAppAttemptEvent(
@@ -1092,6 +1117,64 @@ public class TestRMAppAttemptTransitions
Assert.assertNull(token);
}
+ @Test
+ public void testFailedToFailed() {
+ // create a failed attempt.
+ when(submissionContext.getKeepContainersAcrossApplicationAttempts())
+ .thenReturn(true);
+ Container amContainer = allocateApplicationAttempt();
+ launchApplicationAttempt(amContainer);
+ runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+ ContainerStatus cs1 =
+ ContainerStatus.newInstance(amContainer.getId(),
+ ContainerState.COMPLETE, "some error", 123);
+ ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+ applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+ appAttemptId, cs1));
+ sendAttemptUpdateSavedEvent(applicationAttempt);
+ assertEquals(RMAppAttemptState.FAILED,
+ applicationAttempt.getAppAttemptState());
+ // should not kill containers when attempt fails.
+ assertTrue(transferStateFromPreviousAttempt);
+
+ // failed attempt captured the container finished event.
+ assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
+ ContainerStatus cs2 =
+ ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
+ ContainerState.COMPLETE, "", 0);
+ applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+ appAttemptId, cs2));
+ assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
+ assertEquals(cs2.getContainerId(), applicationAttempt
+ .getJustFinishedContainers().get(0).getContainerId());
+ }
+
+
+ @Test
+ public void testContainersCleanupForLastAttempt() {
+ // create a failed attempt.
+ applicationAttempt =
+ new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), rmContext,
+ scheduler, masterService, submissionContext, new Configuration(),
+ true);
+ when(submissionContext.getKeepContainersAcrossApplicationAttempts())
+ .thenReturn(true);
+ when(submissionContext.getMaxAppAttempts()).thenReturn(1);
+ Container amContainer = allocateApplicationAttempt();
+ launchApplicationAttempt(amContainer);
+ runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+ ContainerStatus cs1 =
+ ContainerStatus.newInstance(amContainer.getId(),
+ ContainerState.COMPLETE, "some error", 123);
+ ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+ applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+ appAttemptId, cs1));
+ sendAttemptUpdateSavedEvent(applicationAttempt);
+ assertEquals(RMAppAttemptState.FAILED,
+ applicationAttempt.getAppAttemptState());
+ assertFalse(transferStateFromPreviousAttempt);
+ }
+
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Fri Jan 10 15:05:18 2014
@@ -562,18 +562,18 @@ public class TestCapacityScheduler {
new AppAddedSchedulerEvent(appId, "default", "user");
cs.handle(addAppEvent);
SchedulerEvent addAttemptEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId);
+ new AppAttemptAddedSchedulerEvent(appAttemptId, false);
cs.handle(addAttemptEvent);
// Verify the blacklist can be updated independent of requesting containers
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null);
- Assert.assertTrue(cs.getApplication(appAttemptId).isBlacklisted(host));
+ Assert.assertTrue(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host));
- Assert.assertFalse(cs.getApplication(appAttemptId).isBlacklisted(host));
+ Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
rm.stop();
}
@@ -597,66 +597,6 @@ public class TestCapacityScheduler {
assertTrue(appComparator.compare(app1, app3) < 0);
assertTrue(appComparator.compare(app2, app3) < 0);
}
-
- @Test
- public void testConcurrentAccessOnApplications() throws Exception {
- CapacityScheduler cs = new CapacityScheduler();
- verifyConcurrentAccessOnApplications(
- cs.appAttempts, FiCaSchedulerApp.class, Queue.class);
- }
-
- public static <T extends SchedulerApplicationAttempt, Q extends Queue>
- void verifyConcurrentAccessOnApplications(
- final Map<ApplicationAttemptId, T> applications, Class<T> appClazz,
- final Class<Q> queueClazz)
- throws Exception {
- final int size = 10000;
- final ApplicationId appId = ApplicationId.newInstance(0, 0);
- final Constructor<T> ctor = appClazz.getDeclaredConstructor(
- ApplicationAttemptId.class, String.class, queueClazz,
- ActiveUsersManager.class, RMContext.class);
-
- ApplicationAttemptId appAttemptId0
- = ApplicationAttemptId.newInstance(appId, 0);
- applications.put(appAttemptId0, ctor.newInstance(
- appAttemptId0, null, mock(queueClazz), null, null));
- assertNotNull(applications.get(appAttemptId0));
-
- // Imitating the thread of scheduler that will add and remove apps
- final AtomicBoolean finished = new AtomicBoolean(false);
- final AtomicBoolean failed = new AtomicBoolean(false);
- Thread t = new Thread() {
-
- @Override
- public void run() {
- for (int i = 1; i <= size; ++i) {
- ApplicationAttemptId appAttemptId
- = ApplicationAttemptId.newInstance(appId, i);
- try {
- applications.put(appAttemptId, ctor.newInstance(
- appAttemptId, null, mock(queueClazz), null, null));
- } catch (Exception e) {
- failed.set(true);
- finished.set(true);
- return;
- }
- }
- for (int i = 1; i <= size; ++i) {
- ApplicationAttemptId appAttemptId
- = ApplicationAttemptId.newInstance(appId, i);
- applications.remove(appAttemptId);
- }
- finished.set(true);
- }
- };
- t.start();
-
- // Imitating the thread of rmappattempt that will get the app
- while (!finished.get()) {
- assertNotNull(applications.get(appAttemptId0));
- }
- assertFalse(failed.get());
- }
@Test
public void testGetAppsInQueue() throws Exception {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1557144&r1=1557143&r2=1557144&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Fri Jan 10 15:05:18 2014
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -345,11 +347,16 @@ public class TestLeafQueue {
.getMockApplicationAttemptId(0, 1);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null,
rmContext);
- a.submitApplicationAttempt(app_0, user_0);
-
- when(cs.getApplication(appAttemptId_0)).thenReturn(app_0);
+ AppAddedSchedulerEvent addAppEvent =
+ new AppAddedSchedulerEvent(appAttemptId_0.getApplicationId(),
+ a.getQueueName(), user_0);
+ cs.handle(addAppEvent);
+ AppAttemptAddedSchedulerEvent addAttemptEvent =
+ new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
+ cs.handle(addAttemptEvent);
+
AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent(
- appAttemptId_0, RMAppAttemptState.FAILED);
+ appAttemptId_0, RMAppAttemptState.FAILED, false);
cs.handle(event);
assertEquals(0, a.getMetrics().getAppsPending());
@@ -365,9 +372,8 @@ public class TestLeafQueue {
assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(1, a.getMetrics().getAppsPending());
- when(cs.getApplication(appAttemptId_1)).thenReturn(app_0);
event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
- RMAppAttemptState.FINISHED);
+ RMAppAttemptState.FINISHED, false);
cs.handle(event);
assertEquals(1, a.getMetrics().getAppsSubmitted());