You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ju...@apache.org on 2014/08/04 15:25:38 UTC

svn commit: r1615550 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-se...

Author: junping_du
Date: Mon Aug  4 13:25:37 2014
New Revision: 1615550

URL: http://svn.apache.org/r1615550
Log:
YARN-1354. Recover applications upon nodemanager restart. (Contributed by Jason Lowe)

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Aug  4 13:25:37 2014
@@ -35,6 +35,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2181. Added preemption info to logs and RM web UI. (Wangda Tan via
     jianhe)
 
+    YARN-1354. Recover applications upon nodemanager restart. (Jason Lowe via 
+    junping_du)
+
   IMPROVEMENTS
 
     YARN-2242. Improve exception information on AM launch crashes. (Li Lu 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Mon Aug  4 13:25:37 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.apache.hadoop.service.Service.STATE.STARTED;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -42,6 +43,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
@@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -71,6 +74,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -81,6 +86,8 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
@@ -119,11 +126,13 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
 
 public class ContainerManagerImpl extends CompositeService implements
     ServiceStateChangeListener, ContainerManagementProtocol,
@@ -224,14 +233,49 @@ public class ContainerManagerImpl extend
     recover();
   }
 
+  @SuppressWarnings("unchecked")
   private void recover() throws IOException, URISyntaxException {
     NMStateStoreService stateStore = context.getNMStateStore();
     if (stateStore.canRecover()) {
       rsrcLocalizationSrvc.recoverLocalizedResources(
           stateStore.loadLocalizationState());
+
+      RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
+      for (ContainerManagerApplicationProto proto :
+           appsState.getApplications()) {
+        recoverApplication(proto);
+      }
+
+      String diagnostic = "Application marked finished during recovery";
+      for (ApplicationId appId : appsState.getFinishedApplications()) {
+        dispatcher.getEventHandler().handle(
+            new ApplicationFinishEvent(appId, diagnostic));
+      }
     }
   }
 
+  private void recoverApplication(ContainerManagerApplicationProto p)
+      throws IOException {
+    ApplicationId appId = new ApplicationIdPBImpl(p.getId());
+    Credentials creds = new Credentials();
+    creds.readTokenStorageStream(
+        new DataInputStream(p.getCredentials().newInput()));
+
+    List<ApplicationACLMapProto> aclProtoList = p.getAclsList();
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>(aclProtoList.size());
+    for (ApplicationACLMapProto aclProto : aclProtoList) {
+      acls.put(ProtoUtils.convertFromProtoFormat(aclProto.getAccessType()),
+          aclProto.getAcl());
+    }
+
+    LOG.info("Recovering application " + appId);
+    ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
+        creds, context);
+    context.getApplications().put(appId, app);
+    app.handle(new ApplicationInitEvent(appId, acls));
+  }
+
   protected LogHandler createLogHandler(Configuration conf, Context context,
       DeletionService deletionService) {
     if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
@@ -358,6 +402,12 @@ public class ContainerManagerImpl extend
     }
     LOG.info("Applications still running : " + applications.keySet());
 
+    if (this.context.getNMStateStore().canRecover()
+        && !this.context.getDecommissioned()) {
+      // do not cleanup apps as they can be recovered on restart
+      return;
+    }
+
     List<ApplicationId> appIds =
         new ArrayList<ApplicationId>(applications.keySet());
     this.handle(
@@ -567,6 +617,41 @@ public class ContainerManagerImpl extend
       succeededContainers, failedContainers);
   }
 
+  private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
+      String user, Credentials credentials,
+      Map<ApplicationAccessType, String> appAcls) {
+
+    ContainerManagerApplicationProto.Builder builder =
+        ContainerManagerApplicationProto.newBuilder();
+    builder.setId(((ApplicationIdPBImpl) appId).getProto());
+    builder.setUser(user);
+
+    builder.clearCredentials();
+    if (credentials != null) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      try {
+        credentials.writeTokenStorageToStream(dob);
+        builder.setCredentials(ByteString.copyFrom(dob.getData()));
+      } catch (IOException e) {
+        // should not occur
+        LOG.error("Cannot serialize credentials", e);
+      }
+    }
+
+    builder.clearAcls();
+    if (appAcls != null) {
+      for (Map.Entry<ApplicationAccessType, String> acl : appAcls.entrySet()) {
+        ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder()
+            .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
+            .setAcl(acl.getValue())
+            .build();
+        builder.addAcls(p);
+      }
+    }
+
+    return builder.build();
+  }
+
   @SuppressWarnings("unchecked")
   private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
       ContainerTokenIdentifier containerTokenIdentifier,
@@ -640,10 +725,12 @@ public class ContainerManagerImpl extend
         if (null == context.getApplications().putIfAbsent(applicationID,
           application)) {
           LOG.info("Creating a new application reference for app " + applicationID);
-
+          Map<ApplicationAccessType, String> appAcls =
+              container.getLaunchContext().getApplicationACLs();
+          context.getNMStateStore().storeApplication(applicationID,
+              buildAppProto(applicationID, user, credentials, appAcls));
           dispatcher.getEventHandler().handle(
-            new ApplicationInitEvent(applicationID, container.getLaunchContext()
-              .getApplicationACLs()));
+            new ApplicationInitEvent(applicationID, appAcls));
         }
 
         dispatcher.getEventHandler().handle(
@@ -894,6 +981,11 @@ public class ContainerManagerImpl extend
         } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
           diagnostic = "Application killed by ResourceManager";
         }
+        try {
+          this.context.getNMStateStore().storeFinishedApplication(appID);
+        } catch (IOException e) {
+          LOG.error("Unable to update application state in store", e);
+        }
         this.dispatcher.getEventHandler().handle(
             new ApplicationFinishEvent(appID,
                 diagnostic));

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Mon Aug  4 13:25:37 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
+import java.io.IOException;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
@@ -428,6 +429,11 @@ public class ApplicationImpl implements 
       ApplicationId appId = event.getApplicationID();
       app.context.getApplications().remove(appId);
       app.aclsManager.removeApplication(appId);
+      try {
+        app.context.getNMStateStore().removeApplication(appId);
+      } catch (IOException e) {
+        LOG.error("Unable to remove application from state store", e);
+      }
     }
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Mon Aug  4 13:25:37 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -74,6 +75,11 @@ public class NMLeveldbStateStoreService 
   private static final String DELETION_TASK_KEY_PREFIX =
       "DeletionService/deltask_";
 
+  private static final String APPLICATIONS_KEY_PREFIX =
+      "ContainerManager/applications/";
+  private static final String FINISHED_APPS_KEY_PREFIX =
+      "ContainerManager/finishedApps/";
+
   private static final String LOCALIZATION_KEY_PREFIX = "Localization/";
   private static final String LOCALIZATION_PUBLIC_KEY_PREFIX =
       LOCALIZATION_KEY_PREFIX + "public/";
@@ -117,6 +123,92 @@ public class NMLeveldbStateStoreService 
 
 
   @Override
+  public RecoveredApplicationsState loadApplicationsState()
+      throws IOException {
+    RecoveredApplicationsState state = new RecoveredApplicationsState();
+    state.applications = new ArrayList<ContainerManagerApplicationProto>();
+    String keyPrefix = APPLICATIONS_KEY_PREFIX;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(keyPrefix));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(keyPrefix)) {
+          break;
+        }
+        state.applications.add(
+            ContainerManagerApplicationProto.parseFrom(entry.getValue()));
+      }
+
+      state.finishedApplications = new ArrayList<ApplicationId>();
+      keyPrefix = FINISHED_APPS_KEY_PREFIX;
+      iter.seek(bytes(keyPrefix));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(keyPrefix)) {
+          break;
+        }
+        ApplicationId appId =
+            ConverterUtils.toApplicationId(key.substring(keyPrefix.length()));
+        state.finishedApplications.add(appId);
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+
+    return state;
+  }
+
+  @Override
+  public void storeApplication(ApplicationId appId,
+      ContainerManagerApplicationProto p) throws IOException {
+    String key = APPLICATIONS_KEY_PREFIX + appId;
+    try {
+      db.put(bytes(key), p.toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeFinishedApplication(ApplicationId appId)
+      throws IOException {
+    String key = FINISHED_APPS_KEY_PREFIX + appId;
+    try {
+      db.put(bytes(key), new byte[0]);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeApplication(ApplicationId appId)
+      throws IOException {
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        String key = APPLICATIONS_KEY_PREFIX + appId;
+        batch.delete(bytes(key));
+        key = FINISHED_APPS_KEY_PREFIX + appId;
+        batch.delete(bytes(key));
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+
+  @Override
   public RecoveredLocalizationState loadLocalizationState()
       throws IOException {
     RecoveredLocalizationState state = new RecoveredLocalizationState();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Mon Aug  4 13:25:37 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -43,6 +44,25 @@ public class NMNullStateStoreService ext
   }
 
   @Override
+  public RecoveredApplicationsState loadApplicationsState() throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeApplication(ApplicationId appId,
+      ContainerManagerApplicationProto p) throws IOException {
+  }
+
+  @Override
+  public void storeFinishedApplication(ApplicationId appId) {
+  }
+
+  @Override
+  public void removeApplication(ApplicationId appId) throws IOException {
+  }
+
+  @Override
   public RecoveredLocalizationState loadLocalizationState()
       throws IOException {
     throw new UnsupportedOperationException(

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Mon Aug  4 13:25:37 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -45,6 +46,19 @@ public abstract class NMStateStoreServic
     super(name);
   }
 
+  public static class RecoveredApplicationsState {
+    List<ContainerManagerApplicationProto> applications;
+    List<ApplicationId> finishedApplications;
+
+    public List<ContainerManagerApplicationProto> getApplications() {
+      return applications;
+    }
+
+    public List<ApplicationId> getFinishedApplications() {
+      return finishedApplications;
+    }
+  }
+
   public static class LocalResourceTrackerState {
     List<LocalizedResourceProto> localizedResources =
         new ArrayList<LocalizedResourceProto>();
@@ -162,6 +176,19 @@ public abstract class NMStateStoreServic
   }
 
 
+  public abstract RecoveredApplicationsState loadApplicationsState()
+      throws IOException;
+
+  public abstract void storeApplication(ApplicationId appId,
+      ContainerManagerApplicationProto p) throws IOException;
+
+  public abstract void storeFinishedApplication(ApplicationId appId)
+      throws IOException;
+
+  public abstract void removeApplication(ApplicationId appId)
+      throws IOException;
+
+
   /**
    * Load the state of localized resources
    * @return recovered localized resource state

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto Mon Aug  4 13:25:37 2014
@@ -24,6 +24,13 @@ package hadoop.yarn;
 
 import "yarn_protos.proto";
 
+message ContainerManagerApplicationProto {
+  optional ApplicationIdProto id = 1;
+  optional string user = 2;
+  optional bytes credentials = 3;
+  repeated ApplicationACLMapProto acls = 4;
+}
+
 message DeletionServiceDeleteTaskProto {
   optional int32 id = 1;
   optional string user = 2;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Mon Aug  4 13:25:37 2014
@@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -91,8 +93,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 
 @SuppressWarnings("rawtypes")
 public class TestNodeStatusUpdater {

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java?rev=1615550&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java Mon Aug  4 13:25:37 2014
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.junit.Test;
+
+public class TestContainerManagerRecovery {
+
+  private NodeManagerMetrics metrics = NodeManagerMetrics.create();
+
+  @Test
+  public void testApplicationRecovery() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
+    conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
+    NMStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    Context context = new NMContext(new NMContainerTokenSecretManager(
+        conf), new NMTokenSecretManagerInNM(), null,
+        new ApplicationACLsManager(conf), stateStore);
+    ContainerManagerImpl cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+
+    // simulate registration with RM
+    MasterKey masterKey = new MasterKeyPBImpl();
+    masterKey.setKeyId(123);
+    masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
+      .byteValue() }));
+    context.getContainerTokenSecretManager().setMasterKey(masterKey);
+    context.getNMTokenSecretManager().setMasterKey(masterKey);
+
+    // add an application by starting a container
+    String appUser = "app_user1";
+    String modUser = "modify_user1";
+    String viewUser = "view_user1";
+    String enemyUser = "enemy_user";
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cid = ContainerId.newInstance(attemptId, 1);
+    Map<String, LocalResource> localResources = Collections.emptyMap();
+    Map<String, String> containerEnv = Collections.emptyMap();
+    List<String> containerCmds = Collections.emptyList();
+    Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+    Credentials containerCreds = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    containerCreds.writeTokenStorageToStream(dob);
+    ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
+        dob.getLength());
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>();
+    acls.put(ApplicationAccessType.MODIFY_APP, modUser);
+    acls.put(ApplicationAccessType.VIEW_APP, viewUser);
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, containerEnv, containerCmds, serviceData,
+        containerTokens, acls);
+    StartContainersResponse startResponse = startContainer(context, cm, cid,
+        clc);
+    assertTrue(startResponse.getFailedRequests().isEmpty());
+    assertEquals(1, context.getApplications().size());
+    Application app = context.getApplications().get(appId);
+    assertNotNull(app);
+    waitForAppState(app, ApplicationState.INITING);
+    assertTrue(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(modUser),
+        ApplicationAccessType.MODIFY_APP, appUser, appId));
+    assertFalse(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(viewUser),
+        ApplicationAccessType.MODIFY_APP, appUser, appId));
+    assertTrue(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(viewUser),
+        ApplicationAccessType.VIEW_APP, appUser, appId));
+    assertFalse(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(enemyUser),
+        ApplicationAccessType.VIEW_APP, appUser, appId));
+
+    // reset container manager and verify app recovered with proper acls
+    cm.stop();
+    context = new NMContext(new NMContainerTokenSecretManager(
+        conf), new NMTokenSecretManagerInNM(), null,
+        new ApplicationACLsManager(conf), stateStore);
+    cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+    assertEquals(1, context.getApplications().size());
+    app = context.getApplications().get(appId);
+    assertNotNull(app);
+    waitForAppState(app, ApplicationState.INITING);
+    assertTrue(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(modUser),
+        ApplicationAccessType.MODIFY_APP, appUser, appId));
+    assertFalse(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(viewUser),
+        ApplicationAccessType.MODIFY_APP, appUser, appId));
+    assertTrue(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(viewUser),
+        ApplicationAccessType.VIEW_APP, appUser, appId));
+    assertFalse(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(enemyUser),
+        ApplicationAccessType.VIEW_APP, appUser, appId));
+
+    // simulate application completion
+    List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
+    finishedApps.add(appId);
+    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
+        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
+
+    // restart and verify app is marked for finishing
+    cm.stop();
+    context = new NMContext(new NMContainerTokenSecretManager(
+        conf), new NMTokenSecretManagerInNM(), null,
+        new ApplicationACLsManager(conf), stateStore);
+    cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+    assertEquals(1, context.getApplications().size());
+    app = context.getApplications().get(appId);
+    assertNotNull(app);
+    waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
+    assertTrue(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(modUser),
+        ApplicationAccessType.MODIFY_APP, appUser, appId));
+    assertFalse(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(viewUser),
+        ApplicationAccessType.MODIFY_APP, appUser, appId));
+    assertTrue(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(viewUser),
+        ApplicationAccessType.VIEW_APP, appUser, appId));
+    assertFalse(context.getApplicationACLsManager().checkAccess(
+        UserGroupInformation.createRemoteUser(enemyUser),
+        ApplicationAccessType.VIEW_APP, appUser, appId));
+
+    // simulate log aggregation completion
+    app.handle(new ApplicationEvent(app.getAppId(),
+        ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
+    assertEquals(app.getApplicationState(), ApplicationState.FINISHED);
+    app.handle(new ApplicationEvent(app.getAppId(),
+        ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
+
+    // restart and verify app is no longer present after recovery
+    cm.stop();
+    context = new NMContext(new NMContainerTokenSecretManager(
+        conf), new NMTokenSecretManagerInNM(), null,
+        new ApplicationACLsManager(conf), stateStore);
+    cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+    assertTrue(context.getApplications().isEmpty());
+    cm.stop();
+  }
+
+  private StartContainersResponse startContainer(Context context,
+      final ContainerManagerImpl cm, ContainerId cid,
+      ContainerLaunchContext clc) throws Exception {
+    UserGroupInformation user = UserGroupInformation.createRemoteUser(
+        cid.getApplicationAttemptId().toString());
+    StartContainerRequest scReq = StartContainerRequest.newInstance(
+        clc, TestContainerManager.createContainerToken(cid, 0,
+            context.getNodeId(), user.getShortUserName(),
+            context.getContainerTokenSecretManager()));
+    final List<StartContainerRequest> scReqList =
+        new ArrayList<StartContainerRequest>();
+    scReqList.add(scReq);
+    NMTokenIdentifier nmToken = new NMTokenIdentifier(
+        cid.getApplicationAttemptId(), context.getNodeId(),
+        user.getShortUserName(),
+        context.getNMTokenSecretManager().getCurrentKey().getKeyId());
+    user.addTokenIdentifier(nmToken);
+    return user.doAs(new PrivilegedExceptionAction<StartContainersResponse>() {
+      @Override
+      public StartContainersResponse run() throws Exception {
+        return cm.startContainers(
+            StartContainersRequest.newInstance(scReqList));
+      }
+    });
+  }
+
+  private void waitForAppState(Application app, ApplicationState state)
+      throws Exception {
+    final int msecPerSleep = 10;
+    int msecLeft = 5000;
+    while (app.getApplicationState() != state && msecLeft > 0) {
+      Thread.sleep(msecPerSleep);
+      msecLeft -= msecPerSleep;
+    }
+    assertEquals(state, app.getApplicationState());
+  }
+
+  private ContainerManagerImpl createContainerManager(Context context) {
+    final LogHandler logHandler = mock(LogHandler.class);
+    final ResourceLocalizationService rsrcSrv =
+        new ResourceLocalizationService(null, null, null, null,
+            context.getNMStateStore()) {
+          @Override
+          public void serviceInit(Configuration conf) throws Exception {
+          }
+
+          @Override
+          public void serviceStart() throws Exception {
+            // do nothing
+          }
+
+          @Override
+          public void serviceStop() throws Exception {
+            // do nothing
+          }
+
+          @Override
+          public void handle(LocalizationEvent event) {
+            // do nothing
+          }
+    };
+
+    final ContainersLauncher launcher = new ContainersLauncher(context, null,
+        null, null, null) {
+          @Override
+          public void handle(ContainersLauncherEvent event) {
+            // do nothing
+          }
+    };
+
+    return new ContainerManagerImpl(context,
+        mock(ContainerExecutor.class), mock(DeletionService.class),
+        mock(NodeStatusUpdater.class), metrics,
+        context.getApplicationACLsManager(), null) {
+          @Override
+          protected LogHandler createLogHandler(Configuration conf,
+              Context context, DeletionService deletionService) {
+            return logHandler;
+          }
+
+          @Override
+          protected ResourceLocalizationService createResourceLocalizationService(
+              ContainerExecutor exec, DeletionService deletionContext) {
+            return rsrcSrv;
+          }
+
+          @Override
+          protected ContainersLauncher createContainersLauncher(
+              Context context, ContainerExecutor exec) {
+            return launcher;
+          }
+
+          @Override
+          public void setBlockNewContainerRequests(
+              boolean blockNewContainerRequests) {
+            // do nothing
+          }
+    };
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Mon Aug  4 13:25:37 2014
@@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -722,6 +723,8 @@ public class TestContainer {
       Context context = mock(Context.class);
       when(context.getApplications()).thenReturn(
           new ConcurrentHashMap<ApplicationId, Application>());
+      NMNullStateStoreService stateStore = new NMNullStateStoreService();
+      when(context.getNMStateStore()).thenReturn(stateStore);
       ContainerExecutor executor = mock(ContainerExecutor.class);
       launcher =
           new ContainersLauncher(context, dispatcher, executor, null, null);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Mon Aug  4 13:25:37 2014
@@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.no
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -29,12 +31,15 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 public class NMMemoryStateStoreService extends NMStateStoreService {
+  private Map<ApplicationId, ContainerManagerApplicationProto> apps;
+  private Set<ApplicationId> finishedApps;
   private Map<TrackerKey, TrackerState> trackerStates;
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
   private RecoveredNMTokensState nmTokenState;
@@ -44,6 +49,58 @@ public class NMMemoryStateStoreService e
     super(NMMemoryStateStoreService.class.getName());
   }
 
+  @Override
+  protected void initStorage(Configuration conf) {
+    apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
+    finishedApps = new HashSet<ApplicationId>();
+    nmTokenState = new RecoveredNMTokensState();
+    nmTokenState.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>();
+    containerTokenState = new RecoveredContainerTokensState();
+    containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
+    trackerStates = new HashMap<TrackerKey, TrackerState>();
+    deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
+  }
+
+  @Override
+  protected void startStorage() {
+  }
+
+  @Override
+  protected void closeStorage() {
+  }
+
+
+  @Override
+  public RecoveredApplicationsState loadApplicationsState()
+      throws IOException {
+    RecoveredApplicationsState state = new RecoveredApplicationsState();
+    state.applications = new ArrayList<ContainerManagerApplicationProto>(
+        apps.values());
+    state.finishedApplications = new ArrayList<ApplicationId>(finishedApps);
+    return state;
+  }
+
+  @Override
+  public void storeApplication(ApplicationId appId,
+      ContainerManagerApplicationProto proto) throws IOException {
+    ContainerManagerApplicationProto protoCopy =
+        ContainerManagerApplicationProto.parseFrom(proto.toByteString());
+    apps.put(appId, protoCopy);
+  }
+
+  @Override
+  public void storeFinishedApplication(ApplicationId appId) {
+    finishedApps.add(appId);
+  }
+
+  @Override
+  public void removeApplication(ApplicationId appId) throws IOException {
+    apps.remove(appId);
+    finishedApps.remove(appId);
+  }
+
+
   private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
     LocalResourceTrackerState result = new LocalResourceTrackerState();
     result.localizedResources.addAll(ts.localizedResources.values());
@@ -117,25 +174,6 @@ public class NMMemoryStateStoreService e
     }
   }
 
-  @Override
-  protected void initStorage(Configuration conf) {
-    nmTokenState = new RecoveredNMTokensState();
-    nmTokenState.applicationMasterKeys =
-        new HashMap<ApplicationAttemptId, MasterKey>();
-    containerTokenState = new RecoveredContainerTokensState();
-    containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
-    trackerStates = new HashMap<TrackerKey, TrackerState>();
-    deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
-  }
-
-  @Override
-  protected void startStorage() {
-  }
-
-  @Override
-  protected void closeStorage() {
-  }
-
 
   @Override
   public RecoveredDeletionServiceState loadDeletionServiceState()

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Mon Aug  4 13:25:37 2014
@@ -37,13 +37,16 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
@@ -142,6 +145,54 @@ public class TestNMLeveldbStateStoreServ
   }
 
   @Test
+  public void testApplicationStorage() throws IOException {
+    // test empty when no state
+    RecoveredApplicationsState state = stateStore.loadApplicationsState();
+    assertTrue(state.getApplications().isEmpty());
+    assertTrue(state.getFinishedApplications().isEmpty());
+
+    // store an application and verify recovered
+    final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
+    ContainerManagerApplicationProto.Builder builder =
+        ContainerManagerApplicationProto.newBuilder();
+    builder.setId(((ApplicationIdPBImpl) appId1).getProto());
+    builder.setUser("user1");
+    ContainerManagerApplicationProto appProto1 = builder.build();
+    stateStore.storeApplication(appId1, appProto1);
+    restartStateStore();
+    state = stateStore.loadApplicationsState();
+    assertEquals(1, state.getApplications().size());
+    assertEquals(appProto1, state.getApplications().get(0));
+    assertTrue(state.getFinishedApplications().isEmpty());
+
+    // finish an application and add a new one
+    stateStore.storeFinishedApplication(appId1);
+    final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
+    builder = ContainerManagerApplicationProto.newBuilder();
+    builder.setId(((ApplicationIdPBImpl) appId2).getProto());
+    builder.setUser("user2");
+    ContainerManagerApplicationProto appProto2 = builder.build();
+    stateStore.storeApplication(appId2, appProto2);
+    restartStateStore();
+    state = stateStore.loadApplicationsState();
+    assertEquals(2, state.getApplications().size());
+    assertTrue(state.getApplications().contains(appProto1));
+    assertTrue(state.getApplications().contains(appProto2));
+    assertEquals(1, state.getFinishedApplications().size());
+    assertEquals(appId1, state.getFinishedApplications().get(0));
+
+    // test removing an application
+    stateStore.storeFinishedApplication(appId2);
+    stateStore.removeApplication(appId2);
+    restartStateStore();
+    state = stateStore.loadApplicationsState();
+    assertEquals(1, state.getApplications().size());
+    assertEquals(appProto1, state.getApplications().get(0));
+    assertEquals(1, state.getFinishedApplications().size());
+    assertEquals(appId1, state.getFinishedApplications().get(0));
+  }
+
+  @Test
   public void testStartResourceLocalization() throws IOException {
     String user = "somebody";
     ApplicationId appId = ApplicationId.newInstance(1, 1);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Mon Aug  4 13:25:37 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -312,7 +311,8 @@ public class ResourceTrackerService exte
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeReconnectEvent(nodeId, rmNode));
+          new RMNodeReconnectEvent(nodeId, rmNode,
+              request.getRunningApplications()));
     }
     // On every node manager register we will be clearing NMToken keys if
     // present for any running application.

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Aug  4 13:25:37 2014
@@ -1191,6 +1191,9 @@ public class RMAppImpl implements RMApp,
   
   public static boolean isAppInFinalState(RMApp rmApp) {
     RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
+    if (appState == null) {
+      appState = rmApp.getState();
+    }
     return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
         || appState == RMAppState.KILLED;
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Mon Aug  4 13:25:37 2014
@@ -456,6 +456,24 @@ public class RMNodeImpl implements RMNod
     }
   }
 
+  private static void handleRunningAppOnNode(RMNodeImpl rmNode,
+      RMContext context, ApplicationId appId, NodeId nodeId) {
+    RMApp app = context.getRMApps().get(appId);
+
+    // if we failed getting app by appId, maybe something wrong happened, just
+    // add the app to the finishedApplications list so that the app can be
+    // cleaned up on the NM
+    if (null == app) {
+      LOG.warn("Cannot get RMApp by appId=" + appId
+          + ", just added it to finishedApplications list for cleanup");
+      rmNode.finishedApplications.add(appId);
+      return;
+    }
+
+    context.getDispatcher().getEventHandler()
+        .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
+  }
+
   public static class AddNodeTransition implements
       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
@@ -496,24 +514,6 @@ public class RMNodeImpl implements RMNod
         new NodesListManagerEvent(
             NodesListManagerEventType.NODE_USABLE, rmNode));
     }
-
-    void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context,
-        ApplicationId appId, NodeId nodeId) {
-      RMApp app = context.getRMApps().get(appId);
-      
-      // if we failed getting app by appId, maybe something wrong happened, just
-      // add the app to the finishedApplications list so that the app can be
-      // cleaned up on the NM
-      if (null == app) {
-        LOG.warn("Cannot get RMApp by appId=" + appId
-            + ", just added it to finishedApplications list for cleanup");
-        rmNode.finishedApplications.add(appId);
-        return;
-      }
-
-      context.getDispatcher().getEventHandler()
-          .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
-    }
   }
 
   public static class ReconnectNodeTransition implements
@@ -526,7 +526,8 @@ public class RMNodeImpl implements RMNod
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeRemovedSchedulerEvent(rmNode));
 
-      RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
+      RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
+      RMNode newNode = reconnectEvent.getReconnectedNode();
       rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
       if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
           && rmNode.getHttpPort() == newNode.getHttpPort()) {
@@ -551,6 +552,13 @@ public class RMNodeImpl implements RMNod
         rmNode.context.getDispatcher().getEventHandler().handle(
             new RMNodeStartedEvent(newNode.getNodeID(), null, null));
       }
+
+      if (null != reconnectEvent.getRunningApplications()) {
+        for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
+          handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
+        }
+      }
+
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodesListManagerEvent(
               NodesListManagerEventType.NODE_USABLE, rmNode));

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java Mon Aug  4 13:25:37 2014
@@ -18,17 +18,27 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 
 public class RMNodeReconnectEvent extends RMNodeEvent {
   private RMNode reconnectedNode;
+  private List<ApplicationId> runningApplications;
 
-  public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
+  public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
+      List<ApplicationId> runningApps) {
     super(nodeId, RMNodeEventType.RECONNECTED);
     reconnectedNode = newNode;
+    runningApplications = runningApps;
   }
 
   public RMNode getReconnectedNode() {
     return reconnectedNode;
   }
+
+  public List<ApplicationId> getRunningApplications() {
+    return runningApplications;
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Mon Aug  4 13:25:37 2014
@@ -449,6 +449,35 @@ public class TestApplicationCleanup {
     rm2.stop();
   }
 
+  @Test (timeout = 60000)
+  public void testAppCleanupWhenNMReconnects() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+    // wait for application cleanup message received
+    waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+    // reconnect NM with application still active
+    nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+    waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+    rm1.stop();
+  }
+
   public static void main(String[] args) throws Exception {
     TestApplicationCleanup t = new TestApplicationCleanup();
     t.testAppCleanup();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1615550&r1=1615549&r2=1615550&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Mon Aug  4 13:25:37 2014
@@ -520,7 +520,7 @@ public class TestRMNodeTransitions {
     int initialUnhealthy = cm.getUnhealthyNMs();
     int initialDecommissioned = cm.getNumDecommisionedNMs();
     int initialRebooted = cm.getNumRebootedNMs();
-    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node));
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null));
     Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
     Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
     Assert.assertEquals("Unhealthy Nodes",
@@ -542,7 +542,8 @@ public class TestRMNodeTransitions {
     RMNodeImpl node = getRunningNode(nmVersion1);
     Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
     RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
-    node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode));
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode,
+        null));
     Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
   }
 }