You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ji...@apache.org on 2014/06/25 06:45:50 UTC
svn commit: r1605264 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/h...
Author: jianhe
Date: Wed Jun 25 04:45:50 2014
New Revision: 1605264
URL: http://svn.apache.org/r1605264
Log:
Merge r1605263 from trunk. YARN-1365. Changed ApplicationMasterService to allow an app to re-register after RM restart. Contributed by Anubhav Dhoot
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java
- copied unchanged from r1605263, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Jun 25 04:45:50 2014
@@ -27,6 +27,9 @@ Release 2.5.0 - UNRELEASED
YARN-1339. Recover DeletionService state upon nodemanager restart. (Jason Lowe
via junping_du)
+ YARN-1365. Changed ApplicationMasterService to allow an app to re-register
+ after RM restart. (Anubhav Dhoot via jianhe)
+
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java Wed Jun 25 04:45:50 2014
@@ -24,10 +24,8 @@ import org.apache.hadoop.yarn.api.protoc
/**
* This exception is thrown when an ApplicationMaster asks for resources by
- * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries
- * to unregister by calling
- * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)}
- * API without first registering by calling
+ * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)}
+ * without first registering by calling
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
* or if it tries to register more than once.
*/
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Jun 25 04:45:50 2014
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
@@ -107,12 +108,15 @@ public class ApplicationMasterService ex
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
+ private final AllocateResponse shutdown =
+ recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
+ this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN);
this.resync.setAMCommand(AMCommand.AM_RESYNC);
this.rmContext = rmContext;
}
@@ -346,9 +350,9 @@ public class ApplicationMasterService ex
AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
message, applicationAttemptId.getApplicationId(),
applicationAttemptId);
- throw new InvalidApplicationMasterRequestException(message);
+ throw new ApplicationMasterNotRegisteredException(message);
}
-
+
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
RMApp rmApp =
@@ -409,22 +413,23 @@ public class ApplicationMasterService ex
AllocateResponseLock lock = responseMap.get(appAttemptId);
if (lock == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
- return resync;
+ return shutdown;
}
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
- "Application Master is trying to allocate before registering for: "
- + appAttemptId.getApplicationId();
- LOG.error(message);
+ "Application Master is not registered for known application: "
+ + appAttemptId.getApplicationId()
+ + ". Let AM resync.";
+ LOG.info(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
.getUser(), AuditConstants.REGISTER_AM, "",
"ApplicationMasterService", message,
appAttemptId.getApplicationId(),
appAttemptId);
- throw new InvalidApplicationMasterRequestException(message);
+ return resync;
}
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Jun 25 04:45:50 2014
@@ -899,8 +899,12 @@ public class RMAppAttemptImpl implements
} else {
// Add the current attempt to the scheduler.
if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
+ // Need to register an app attempt before AM can register
+ appAttempt.masterService
+ .registerAppAttempt(appAttempt.applicationAttemptId);
+
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
- appAttempt.getAppAttemptId(), false));
+ appAttempt.getAppAttemptId(), false, false));
}
/*
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Jun 25 04:45:50 2014
@@ -557,7 +557,8 @@ public class CapacityScheduler extends
private synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- boolean transferStateFromPreviousAttempt) {
+ boolean transferStateFromPreviousAttempt,
+ boolean shouldNotifyAttemptAdded) {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
CSQueue queue = (CSQueue) application.getQueue();
@@ -575,9 +576,15 @@ public class CapacityScheduler extends
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
+ queue.getQueueName());
- rmContext.getDispatcher().getEventHandler() .handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
+ if (shouldNotifyAttemptAdded) {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(applicationAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping notifying ATTEMPT_ADDED");
+ }
+ }
}
private synchronized void doneApplication(ApplicationId applicationId,
@@ -911,7 +918,8 @@ public class CapacityScheduler extends
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+ appAttemptAddedEvent.getShouldNotifyAttemptAdded());
}
break;
case APP_ATTEMPT_REMOVED:
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Wed Jun 25 04:45:50 2014
@@ -24,13 +24,22 @@ public class AppAttemptAddedSchedulerEve
private final ApplicationAttemptId applicationAttemptId;
private final boolean transferStateFromPreviousAttempt;
+ private final boolean shouldNotifyAttemptAdded;
public AppAttemptAddedSchedulerEvent(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) {
+ this(applicationAttemptId, transferStateFromPreviousAttempt, true);
+ }
+
+ public AppAttemptAddedSchedulerEvent(
+ ApplicationAttemptId applicationAttemptId,
+ boolean transferStateFromPreviousAttempt,
+ boolean shouldNotifyAttemptAdded) {
super(SchedulerEventType.APP_ATTEMPT_ADDED);
this.applicationAttemptId = applicationAttemptId;
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
+ this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
}
public ApplicationAttemptId getApplicationAttemptId() {
@@ -40,4 +49,8 @@ public class AppAttemptAddedSchedulerEve
public boolean getTransferStateFromPreviousAttempt() {
return transferStateFromPreviousAttempt;
}
+
+ public boolean getShouldNotifyAttemptAdded() {
+ return shouldNotifyAttemptAdded;
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Wed Jun 25 04:45:50 2014
@@ -597,7 +597,8 @@ public class FairScheduler extends
*/
protected synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- boolean transferStateFromPreviousAttempt) {
+ boolean transferStateFromPreviousAttempt,
+ boolean shouldNotifyAttemptAdded) {
SchedulerApplication<FSSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser();
@@ -625,9 +626,16 @@ public class FairScheduler extends
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user: " + user);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
+
+ if (shouldNotifyAttemptAdded) {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(applicationAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping notifying ATTEMPT_ADDED");
+ }
+ }
}
/**
@@ -1130,7 +1138,8 @@ public class FairScheduler extends
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+ appAttemptAddedEvent.getShouldNotifyAttemptAdded());
break;
case APP_ATTEMPT_REMOVED:
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Jun 25 04:45:50 2014
@@ -370,7 +370,8 @@ public class FifoScheduler extends
@VisibleForTesting
public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
- boolean transferStateFromPreviousAttempt) {
+ boolean transferStateFromPreviousAttempt,
+ boolean shouldNotifyAttemptAdded) {
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
@@ -388,9 +389,15 @@ public class FifoScheduler extends
metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser());
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(appAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
+ if (shouldNotifyAttemptAdded) {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(appAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping notifying ATTEMPT_ADDED");
+ }
+ }
}
private synchronized void doneApplication(ApplicationId applicationId,
@@ -780,7 +787,8 @@ public class FifoScheduler extends
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+ appAttemptAddedEvent.getShouldNotifyAttemptAdded());
}
break;
case APP_ATTEMPT_REMOVED:
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java Wed Jun 25 04:45:50 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -194,28 +195,17 @@ public class TestApplicationMasterLaunch
// request for containers
int request = 2;
- try {
- AllocateResponse ar =
- am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
- } catch (Exception e) {
- Assert.assertEquals("Application Master is trying to allocate before "
- + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
- e.getMessage());
- thrown = true;
- }
+ AllocateResponse ar =
+ am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
+ Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
+
// kick the scheduler
nm1.nodeHeartbeat(true);
- try {
- AllocateResponse amrs =
- am.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>());
- } catch (Exception e) {
- Assert.assertEquals("Application Master is trying to allocate before "
- + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
- e.getMessage());
- thrown = true;
- }
- Assert.assertTrue(thrown);
+ AllocateResponse amrs =
+ am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>());
+ Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
+
am.registerAppAttempt();
thrown = false;
try {
@@ -228,5 +218,17 @@ public class TestApplicationMasterLaunch
thrown = true;
}
Assert.assertTrue(thrown);
+
+ // Simulate an AM that was disconnected and app attempt was removed
+ // (responseMap does not contain attemptid)
+ am.unregisterAppAttempt();
+ nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
+ ContainerState.COMPLETE);
+ am.waitForState(RMAppAttemptState.FINISHED);
+
+ AllocateResponse amrs2 =
+ am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>());
+ Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Wed Jun 25 04:45:50 2014
@@ -18,60 +18,33 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import com.google.common.collect.Maps;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.event.InlineDispatcher;
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
import static java.lang.Thread.sleep;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Mockito.*;
public class TestApplicationMasterService {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -270,13 +243,17 @@ public class TestApplicationMasterServic
}
Assert.assertNotNull(cause);
Assert
- .assertTrue(cause instanceof InvalidApplicationMasterRequestException);
+ .assertTrue(cause instanceof ApplicationMasterNotRegisteredException);
Assert.assertNotNull(cause.getMessage());
Assert
.assertTrue(cause
.getMessage()
.contains(
"Application Master is trying to unregister before registering for:"));
+
+ am1.registerAppAttempt();
+
+ am1.unregisterAppAttempt(req, false);
} finally {
if (rm != null) {
rm.stop();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Wed Jun 25 04:45:50 2014
@@ -238,7 +238,7 @@ public class TestFifoScheduler {
}
ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
- scheduler.addApplicationAttempt(attId, false);
+ scheduler.addApplicationAttempt(attId, false, true);
rm.stop();
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Wed Jun 25 04:45:50 2014
@@ -293,7 +293,7 @@ public class TestRMRestart {
AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
- Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC);
+ Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand());
// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -1648,7 +1648,7 @@ public class TestRMRestart {
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
- am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
+ am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Wed Jun 25 04:45:50 2014
@@ -535,6 +535,36 @@ public class TestWorkPreservingRMRestart
assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
}
+ @Test (timeout = 600000)
+ public void testAppReregisterOnRMWorkPreservingRestart() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create app and launch the AM
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
+
+ // start new RM
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+ rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
+
+ am0.setAMRMProtocol(rm2.getApplicationMasterService());
+ am0.registerAppAttempt(false);
+
+ rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+ rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+ }
+
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted,
int allocatedContainers, int availableMB, int availableVirtualCores,
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java Wed Jun 25 04:45:50 2014
@@ -146,7 +146,7 @@ public class FairSchedulerTestBase {
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
- scheduler.addApplicationAttempt(id, false);
+ scheduler.addApplicationAttempt(id, false, true);
}
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Wed Jun 25 04:45:50 2014
@@ -787,13 +787,13 @@ public class TestFairScheduler extends F
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
- scheduler.addApplicationAttempt(id11, false);
+ scheduler.addApplicationAttempt(id11, false, true);
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
- scheduler.addApplicationAttempt(id21, false);
+ scheduler.addApplicationAttempt(id21, false, true);
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
- scheduler.addApplicationAttempt(id22, false);
+ scheduler.addApplicationAttempt(id22, false, true);
int minReqSize =
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
@@ -1555,7 +1555,7 @@ public class TestFairScheduler extends F
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
- scheduler.addApplicationAttempt(appId, false);
+ scheduler.addApplicationAttempt(appId, false, true);
// 1 request with 2 nodes on the same rack. another request with 1 node on
// a different rack
@@ -2714,7 +2714,7 @@ public class TestFairScheduler extends F
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
- fs.addApplicationAttempt(appAttemptId, false);
+ fs.addApplicationAttempt(appAttemptId, false, true);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request =
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);