You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ji...@apache.org on 2014/06/25 06:45:50 UTC

svn commit: r1605264 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/h...

Author: jianhe
Date: Wed Jun 25 04:45:50 2014
New Revision: 1605264

URL: http://svn.apache.org/r1605264
Log:
Merge r1605263 from trunk. YARN-1365. Changed ApplicationMasterService to allow an app to re-register after RM restart. Contributed by Anubhav Dhoot

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java
      - copied unchanged from r1605263, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Jun 25 04:45:50 2014
@@ -27,6 +27,9 @@ Release 2.5.0 - UNRELEASED
     YARN-1339. Recover DeletionService state upon nodemanager restart. (Jason Lowe
     via junping_du)
 
+    YARN-1365. Changed ApplicationMasterService to allow an app to re-register
+    after RM restart. (Anubhav Dhoot via jianhe)
+
   IMPROVEMENTS
 
     YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java Wed Jun 25 04:45:50 2014
@@ -24,10 +24,8 @@ import org.apache.hadoop.yarn.api.protoc
 
 /**
  * This exception is thrown when an ApplicationMaster asks for resources by
- * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries
- * to unregister by calling
- * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)}
- * API without first registering by calling
+ * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)}
+ * without first registering by calling
  * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
  * or if it tries to register more than once.
  */

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Jun 25 04:45:50 2014
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
@@ -107,12 +108,15 @@ public class ApplicationMasterService ex
       new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
   private final AllocateResponse resync =
       recordFactory.newRecordInstance(AllocateResponse.class);
+  private final AllocateResponse shutdown =
+      recordFactory.newRecordInstance(AllocateResponse.class);
   private final RMContext rmContext;
 
   public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
     super(ApplicationMasterService.class.getName());
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.rScheduler = scheduler;
+    this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN);
     this.resync.setAMCommand(AMCommand.AM_RESYNC);
     this.rmContext = rmContext;
   }
@@ -346,9 +350,9 @@ public class ApplicationMasterService ex
             AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
             message, applicationAttemptId.getApplicationId(),
             applicationAttemptId);
-        throw new InvalidApplicationMasterRequestException(message);
+        throw new ApplicationMasterNotRegisteredException(message);
       }
-      
+
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
 
       RMApp rmApp =
@@ -409,22 +413,23 @@ public class ApplicationMasterService ex
     AllocateResponseLock lock = responseMap.get(appAttemptId);
     if (lock == null) {
       LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
-      return resync;
+      return shutdown;
     }
     synchronized (lock) {
       AllocateResponse lastResponse = lock.getAllocateResponse();
       if (!hasApplicationMasterRegistered(appAttemptId)) {
         String message =
-            "Application Master is trying to allocate before registering for: "
-                + appAttemptId.getApplicationId();
-        LOG.error(message);
+            "Application Master is not registered for known application: "
+                + appAttemptId.getApplicationId()
+                + ". Let AM resync.";
+        LOG.info(message);
         RMAuditLogger.logFailure(
             this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
                 .getUser(), AuditConstants.REGISTER_AM, "",
             "ApplicationMasterService", message,
             appAttemptId.getApplicationId(),
             appAttemptId);
-        throw new InvalidApplicationMasterRequestException(message);
+        return resync;
       }
 
       if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Jun 25 04:45:50 2014
@@ -899,8 +899,12 @@ public class RMAppAttemptImpl implements
       } else {
         // Add the current attempt to the scheduler.
         if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
+          // Need to register an app attempt before AM can register
+          appAttempt.masterService
+              .registerAppAttempt(appAttempt.applicationAttemptId);
+
           appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
-            appAttempt.getAppAttemptId(), false));
+            appAttempt.getAppAttemptId(), false, false));
         }
 
         /*

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Jun 25 04:45:50 2014
@@ -557,7 +557,8 @@ public class CapacityScheduler extends
 
   private synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
+      boolean transferStateFromPreviousAttempt,
+      boolean shouldNotifyAttemptAdded) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
@@ -575,9 +576,15 @@ public class CapacityScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
-    rmContext.getDispatcher().getEventHandler() .handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-          RMAppAttemptEventType.ATTEMPT_ADDED));
+    if (shouldNotifyAttemptAdded) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptEvent(applicationAttemptId,
+              RMAppAttemptEventType.ATTEMPT_ADDED));
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+      }
+    }
   }
 
   private synchronized void doneApplication(ApplicationId applicationId,
@@ -911,7 +918,8 @@ public class CapacityScheduler extends
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
     }
     break;
     case APP_ATTEMPT_REMOVED:

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Wed Jun 25 04:45:50 2014
@@ -24,13 +24,22 @@ public class AppAttemptAddedSchedulerEve
 
   private final ApplicationAttemptId applicationAttemptId;
   private final boolean transferStateFromPreviousAttempt;
+  private final boolean shouldNotifyAttemptAdded;
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt) {
+    this(applicationAttemptId, transferStateFromPreviousAttempt, true);
+  }
+
+  public AppAttemptAddedSchedulerEvent(
+      ApplicationAttemptId applicationAttemptId,
+      boolean transferStateFromPreviousAttempt,
+      boolean shouldNotifyAttemptAdded) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
     this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
+    this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
@@ -40,4 +49,8 @@ public class AppAttemptAddedSchedulerEve
   public boolean getTransferStateFromPreviousAttempt() {
     return transferStateFromPreviousAttempt;
   }
+
+  public boolean getShouldNotifyAttemptAdded() {
+    return shouldNotifyAttemptAdded;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Wed Jun 25 04:45:50 2014
@@ -597,7 +597,8 @@ public class FairScheduler extends
    */
   protected synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
+      boolean transferStateFromPreviousAttempt,
+      boolean shouldNotifyAttemptAdded) {
     SchedulerApplication<FSSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
@@ -625,9 +626,16 @@ public class FairScheduler extends
 
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user: " + user);
-    rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-            RMAppAttemptEventType.ATTEMPT_ADDED));
+
+    if (shouldNotifyAttemptAdded) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptEvent(applicationAttemptId,
+              RMAppAttemptEventType.ATTEMPT_ADDED));
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+      }
+    }
   }
 
   /**
@@ -1130,7 +1138,8 @@ public class FairScheduler extends
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Jun 25 04:45:50 2014
@@ -370,7 +370,8 @@ public class FifoScheduler extends
   @VisibleForTesting
   public synchronized void
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
-          boolean transferStateFromPreviousAttempt) {
+          boolean transferStateFromPreviousAttempt,
+          boolean shouldNotifyAttemptAdded) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
@@ -388,9 +389,15 @@ public class FifoScheduler extends
     metrics.submitAppAttempt(user);
     LOG.info("Added Application Attempt " + appAttemptId
         + " to scheduler from user " + application.getUser());
-    rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppAttemptEvent(appAttemptId,
-            RMAppAttemptEventType.ATTEMPT_ADDED));
+    if (shouldNotifyAttemptAdded) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppAttemptEvent(appAttemptId,
+              RMAppAttemptEventType.ATTEMPT_ADDED));
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+      }
+    }
   }
 
   private synchronized void doneApplication(ApplicationId applicationId,
@@ -780,7 +787,8 @@ public class FifoScheduler extends
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
     }
     break;
     case APP_ATTEMPT_REMOVED:

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java Wed Jun 25 04:45:50 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -194,28 +195,17 @@ public class TestApplicationMasterLaunch
 
     // request for containers
     int request = 2;
-    try {
-      AllocateResponse ar =
-          am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
-    } catch (Exception e) {
-      Assert.assertEquals("Application Master is trying to allocate before "
-          + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
-        e.getMessage());
-      thrown = true;
-    }
+    AllocateResponse ar =
+        am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
+    Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
+
     // kick the scheduler
     nm1.nodeHeartbeat(true);
-    try {
-      AllocateResponse amrs =
-          am.allocate(new ArrayList<ResourceRequest>(),
-            new ArrayList<ContainerId>());
-    } catch (Exception e) {
-      Assert.assertEquals("Application Master is trying to allocate before "
-          + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
-        e.getMessage());
-      thrown = true;
-    }
-    Assert.assertTrue(thrown);
+    AllocateResponse amrs =
+        am.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>());
+    Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
+
     am.registerAppAttempt();
     thrown = false;
     try {
@@ -228,5 +218,17 @@ public class TestApplicationMasterLaunch
       thrown = true;
     }
     Assert.assertTrue(thrown);
+
+    // Simulate an AM that was disconnected and app attempt was removed
+    // (responseMap does not contain attemptid)
+    am.unregisterAppAttempt();
+    nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
+        ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FINISHED);
+
+    AllocateResponse amrs2 =
+        am.allocate(new ArrayList<ResourceRequest>(),
+            new ArrayList<ContainerId>());
+    Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN);
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java Wed Jun 25 04:45:50 2014
@@ -18,60 +18,33 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import com.google.common.collect.Maps;
 import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.event.InlineDispatcher;
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
 
 import static java.lang.Thread.sleep;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Mockito.*;
 
 public class TestApplicationMasterService {
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -270,13 +243,17 @@ public class TestApplicationMasterServic
       }
       Assert.assertNotNull(cause);
       Assert
-          .assertTrue(cause instanceof InvalidApplicationMasterRequestException);
+          .assertTrue(cause instanceof ApplicationMasterNotRegisteredException);
       Assert.assertNotNull(cause.getMessage());
       Assert
           .assertTrue(cause
               .getMessage()
               .contains(
                   "Application Master is trying to unregister before registering for:"));
+
+      am1.registerAppAttempt();
+
+      am1.unregisterAppAttempt(req, false);
     } finally {
       if (rm != null) {
         rm.stop();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Wed Jun 25 04:45:50 2014
@@ -238,7 +238,7 @@ public class TestFifoScheduler {
     }
 
     ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
-    scheduler.addApplicationAttempt(attId, false);
+    scheduler.addApplicationAttempt(attId, false, true);
 
     rm.stop();
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Wed Jun 25 04:45:50 2014
@@ -293,7 +293,7 @@ public class TestRMRestart {
     AllocateResponse allocResponse = am1.allocate(
         new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
-    Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC);
+    Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand());
     
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -1648,7 +1648,7 @@ public class TestRMRestart {
     rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
     MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
     am1.registerAppAttempt();
-    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>()); 
+    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
     nm1.nodeHeartbeat(true);
     List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>()).getAllocatedContainers();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Wed Jun 25 04:45:50 2014
@@ -535,6 +535,36 @@ public class TestWorkPreservingRMRestart
     assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
   }
 
+  @Test (timeout = 600000)
+  public void testAppReregisterOnRMWorkPreservingRestart() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
+
+    // start new RM
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
+
+    am0.setAMRMProtocol(rm2.getApplicationMasterService());
+    am0.registerAppAttempt(false);
+
+    rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+  }
+
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,
       int allocatedContainers, int availableMB, int availableVirtualCores,

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java Wed Jun 25 04:45:50 2014
@@ -146,7 +146,7 @@ public class FairSchedulerTestBase {
     // This conditional is for testAclSubmitApplication where app is rejected
     // and no app is added.
     if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
-      scheduler.addApplicationAttempt(id, false);
+      scheduler.addApplicationAttempt(id, false, true);
     }
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1605264&r1=1605263&r2=1605264&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Wed Jun 25 04:45:50 2014
@@ -787,13 +787,13 @@ public class TestFairScheduler extends F
 
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
     scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
-    scheduler.addApplicationAttempt(id11, false);
+    scheduler.addApplicationAttempt(id11, false, true);
     ApplicationAttemptId id21 = createAppAttemptId(2, 1);
     scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
-    scheduler.addApplicationAttempt(id21, false);
+    scheduler.addApplicationAttempt(id21, false, true);
     ApplicationAttemptId id22 = createAppAttemptId(2, 2);
     scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
-    scheduler.addApplicationAttempt(id22, false);
+    scheduler.addApplicationAttempt(id22, false, true);
 
     int minReqSize = 
         FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
@@ -1555,7 +1555,7 @@ public class TestFairScheduler extends F
     
     ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
-    scheduler.addApplicationAttempt(appId, false);
+    scheduler.addApplicationAttempt(appId, false, true);
     
     // 1 request with 2 nodes on the same rack. another request with 1 node on
     // a different rack
@@ -2714,7 +2714,7 @@ public class TestFairScheduler extends F
     ApplicationAttemptId appAttemptId =
             createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
-    fs.addApplicationAttempt(appAttemptId, false);
+    fs.addApplicationAttempt(appAttemptId, false, true);
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request =
             createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);