You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cu...@apache.org on 2014/08/20 03:34:59 UTC

svn commit: r1619019 [4/10] - in /hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/had...

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed Aug 20 01:34:29 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.apache.hadoop.service.Service.STATE.STARTED;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -42,6 +43,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
@@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -71,6 +74,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -81,6 +86,8 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
@@ -119,11 +126,15 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
 
 public class ContainerManagerImpl extends CompositeService implements
     ServiceStateChangeListener, ContainerManagementProtocol,
@@ -224,11 +235,104 @@ public class ContainerManagerImpl extend
     recover();
   }
 
+  @SuppressWarnings("unchecked")
   private void recover() throws IOException, URISyntaxException {
     NMStateStoreService stateStore = context.getNMStateStore();
     if (stateStore.canRecover()) {
       rsrcLocalizationSrvc.recoverLocalizedResources(
           stateStore.loadLocalizationState());
+
+      RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
+      for (ContainerManagerApplicationProto proto :
+           appsState.getApplications()) {
+        recoverApplication(proto);
+      }
+
+      for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
+        recoverContainer(rcs);
+      }
+
+      String diagnostic = "Application marked finished during recovery";
+      for (ApplicationId appId : appsState.getFinishedApplications()) {
+        dispatcher.getEventHandler().handle(
+            new ApplicationFinishEvent(appId, diagnostic));
+      }
+    }
+  }
+
+  private void recoverApplication(ContainerManagerApplicationProto p)
+      throws IOException {
+    ApplicationId appId = new ApplicationIdPBImpl(p.getId());
+    Credentials creds = new Credentials();
+    creds.readTokenStorageStream(
+        new DataInputStream(p.getCredentials().newInput()));
+
+    List<ApplicationACLMapProto> aclProtoList = p.getAclsList();
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>(aclProtoList.size());
+    for (ApplicationACLMapProto aclProto : aclProtoList) {
+      acls.put(ProtoUtils.convertFromProtoFormat(aclProto.getAccessType()),
+          aclProto.getAcl());
+    }
+
+    LOG.info("Recovering application " + appId);
+    ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
+        creds, context);
+    context.getApplications().put(appId, app);
+    app.handle(new ApplicationInitEvent(appId, acls));
+  }
+
+  @SuppressWarnings("unchecked")
+  private void recoverContainer(RecoveredContainerState rcs)
+      throws IOException {
+    StartContainerRequest req = rcs.getStartRequest();
+    ContainerLaunchContext launchContext = req.getContainerLaunchContext();
+    ContainerTokenIdentifier token =
+        BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+    ContainerId containerId = token.getContainerID();
+    ApplicationId appId =
+        containerId.getApplicationAttemptId().getApplicationId();
+
+    LOG.info("Recovering " + containerId + " in state " + rcs.getStatus()
+        + " with exit code " + rcs.getExitCode());
+
+    if (context.getApplications().containsKey(appId)) {
+      Credentials credentials = parseCredentials(launchContext);
+      Container container = new ContainerImpl(getConfig(), dispatcher,
+          context.getNMStateStore(), req.getContainerLaunchContext(),
+          credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
+          rcs.getDiagnostics(), rcs.getKilled());
+      context.getContainers().put(containerId, container);
+      dispatcher.getEventHandler().handle(
+          new ApplicationContainerInitEvent(container));
+    } else {
+      if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
+        LOG.warn(containerId + " has no corresponding application!");
+      }
+      LOG.info("Adding " + containerId + " to recently stopped containers");
+      nodeStatusUpdater.addCompletedContainer(containerId);
+    }
+  }
+
+  private void waitForRecoveredContainers() throws InterruptedException {
+    final int sleepMsec = 100;
+    int waitIterations = 100;
+    List<ContainerId> newContainers = new ArrayList<ContainerId>();
+    while (--waitIterations >= 0) {
+      newContainers.clear();
+      for (Container container : context.getContainers().values()) {
+        if (container.getContainerState() == org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.NEW) {
+          newContainers.add(container.getContainerId());
+        }
+      }
+      if (newContainers.isEmpty()) {
+        break;
+      }
+      LOG.info("Waiting for containers: " + newContainers);
+      Thread.sleep(sleepMsec);
+    }
+    if (waitIterations < 0) {
+      LOG.warn("Timeout waiting for recovered containers");
     }
   }
 
@@ -265,6 +369,23 @@ public class ContainerManagerImpl extend
     // Enqueue user dirs in deletion context
 
     Configuration conf = getConfig();
+    final InetSocketAddress initialAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_PORT);
+    boolean usingEphemeralPort = (initialAddress.getPort() == 0);
+    if (context.getNMStateStore().canRecover() && usingEphemeralPort) {
+      throw new IllegalArgumentException("Cannot support recovery with an "
+          + "ephemeral server port. Check the setting of "
+          + YarnConfiguration.NM_ADDRESS);
+    }
+    // If recovering then delay opening the RPC service until the recovery
+    // of resources and containers have completed, otherwise requests from
+    // clients during recovery can interfere with the recovery process.
+    final boolean delayedRpcServerStart =
+        context.getNMStateStore().canRecover();
+
     Configuration serverConf = new Configuration(conf);
 
     // always enforce it to be token-based.
@@ -274,11 +395,6 @@ public class ContainerManagerImpl extend
     
     YarnRPC rpc = YarnRPC.create(conf);
 
-    InetSocketAddress initialAddress = conf.getSocketAddr(
-        YarnConfiguration.NM_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_PORT);
-
     server =
         rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, 
             serverConf, this.context.getNMTokenSecretManager(),
@@ -295,16 +411,61 @@ public class ContainerManagerImpl extend
     LOG.info("Blocking new container-requests as container manager rpc" +
     		" server is still starting.");
     this.setBlockNewContainerRequests(true);
-    server.start();
-    InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
-    NodeId nodeId = NodeId.newInstance(
-        connectAddress.getAddress().getCanonicalHostName(),
-        connectAddress.getPort());
+
+    String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);
+    String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);
+    String hostOverride = null;
+    if (bindHost != null && !bindHost.isEmpty()
+        && nmAddress != null && !nmAddress.isEmpty()) {
+      //a bind-host case with an address, to support overriding the first
+      //hostname found when querying for our hostname with the specified
+      //address, combine the specified address with the actual port listened
+      //on by the server
+      hostOverride = nmAddress.split(":")[0];
+    }
+
+    // setup node ID
+    InetSocketAddress connectAddress;
+    if (delayedRpcServerStart) {
+      connectAddress = NetUtils.getConnectAddress(initialAddress);
+    } else {
+      server.start();
+      connectAddress = NetUtils.getConnectAddress(server);
+    }
+    NodeId nodeId = buildNodeId(connectAddress, hostOverride);
     ((NodeManager.NMContext)context).setNodeId(nodeId);
     this.context.getNMTokenSecretManager().setNodeId(nodeId);
     this.context.getContainerTokenSecretManager().setNodeId(nodeId);
-    LOG.info("ContainerManager started at " + connectAddress);
+
+    // start remaining services
     super.serviceStart();
+
+    if (delayedRpcServerStart) {
+      waitForRecoveredContainers();
+      server.start();
+
+      // check that the node ID is as previously advertised
+      connectAddress = NetUtils.getConnectAddress(server);
+      NodeId serverNode = buildNodeId(connectAddress, hostOverride);
+      if (!serverNode.equals(nodeId)) {
+        throw new IOException("Node mismatch after server started, expected '"
+            + nodeId + "' but found '" + serverNode + "'");
+      }
+    }
+
+    LOG.info("ContainerManager started at " + connectAddress);
+    LOG.info("ContainerManager bound to " + initialAddress);
+  }
+
+  private NodeId buildNodeId(InetSocketAddress connectAddress,
+      String hostOverride) {
+    if (hostOverride != null) {
+      connectAddress = NetUtils.getConnectAddress(
+          new InetSocketAddress(hostOverride, connectAddress.getPort()));
+    }
+    return NodeId.newInstance(
+        connectAddress.getAddress().getCanonicalHostName(),
+        connectAddress.getPort());
   }
 
   void refreshServiceAcls(Configuration configuration, 
@@ -341,6 +502,12 @@ public class ContainerManagerImpl extend
     }
     LOG.info("Applications still running : " + applications.keySet());
 
+    if (this.context.getNMStateStore().canRecover()
+        && !this.context.getDecommissioned()) {
+      // do not cleanup apps as they can be recovered on restart
+      return;
+    }
+
     List<ApplicationId> appIds =
         new ArrayList<ApplicationId>(applications.keySet());
     this.handle(
@@ -497,6 +664,8 @@ public class ContainerManagerImpl extend
       messageBuilder.append("\nThis token is expired. current time is ")
         .append(System.currentTimeMillis()).append(" found ")
         .append(containerTokenIdentifier.getExpiryTimeStamp());
+      messageBuilder.append("\nNote: System times on machines may be out of sync.")
+        .append(" Check system time and time zones.");
     }
     if (unauthorized) {
       String msg = messageBuilder.toString();
@@ -548,6 +717,41 @@ public class ContainerManagerImpl extend
       succeededContainers, failedContainers);
   }
 
+  private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
+      String user, Credentials credentials,
+      Map<ApplicationAccessType, String> appAcls) {
+
+    ContainerManagerApplicationProto.Builder builder =
+        ContainerManagerApplicationProto.newBuilder();
+    builder.setId(((ApplicationIdPBImpl) appId).getProto());
+    builder.setUser(user);
+
+    builder.clearCredentials();
+    if (credentials != null) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      try {
+        credentials.writeTokenStorageToStream(dob);
+        builder.setCredentials(ByteString.copyFrom(dob.getData()));
+      } catch (IOException e) {
+        // should not occur
+        LOG.error("Cannot serialize credentials", e);
+      }
+    }
+
+    builder.clearAcls();
+    if (appAcls != null) {
+      for (Map.Entry<ApplicationAccessType, String> acl : appAcls.entrySet()) {
+        ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder()
+            .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
+            .setAcl(acl.getValue())
+            .build();
+        builder.addAcls(p);
+      }
+    }
+
+    return builder.build();
+  }
+
   @SuppressWarnings("unchecked")
   private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
       ContainerTokenIdentifier containerTokenIdentifier,
@@ -600,7 +804,8 @@ public class ContainerManagerImpl extend
     Credentials credentials = parseCredentials(launchContext);
 
     Container container =
-        new ContainerImpl(getConfig(), this.dispatcher, launchContext,
+        new ContainerImpl(getConfig(), this.dispatcher,
+            context.getNMStateStore(), launchContext,
           credentials, metrics, containerTokenIdentifier);
     ApplicationId applicationID =
         containerId.getApplicationAttemptId().getApplicationId();
@@ -621,12 +826,15 @@ public class ContainerManagerImpl extend
         if (null == context.getApplications().putIfAbsent(applicationID,
           application)) {
           LOG.info("Creating a new application reference for app " + applicationID);
-
+          Map<ApplicationAccessType, String> appAcls =
+              container.getLaunchContext().getApplicationACLs();
+          context.getNMStateStore().storeApplication(applicationID,
+              buildAppProto(applicationID, user, credentials, appAcls));
           dispatcher.getEventHandler().handle(
-            new ApplicationInitEvent(applicationID, container.getLaunchContext()
-              .getApplicationACLs()));
+            new ApplicationInitEvent(applicationID, appAcls));
         }
 
+        this.context.getNMStateStore().storeContainer(containerId, request);
         dispatcher.getEventHandler().handle(
           new ApplicationContainerInitEvent(container));
 
@@ -674,7 +882,7 @@ public class ContainerManagerImpl extend
   }
 
   private Credentials parseCredentials(ContainerLaunchContext launchContext)
-      throws YarnException {
+      throws IOException {
     Credentials credentials = new Credentials();
     // //////////// Parse credentials
     ByteBuffer tokens = launchContext.getTokens();
@@ -683,15 +891,11 @@ public class ContainerManagerImpl extend
       DataInputByteBuffer buf = new DataInputByteBuffer();
       tokens.rewind();
       buf.reset(tokens);
-      try {
-        credentials.readTokenStorageStream(buf);
-        if (LOG.isDebugEnabled()) {
-          for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
-            LOG.debug(tk.getService() + " = " + tk.toString());
-          }
+      credentials.readTokenStorageStream(buf);
+      if (LOG.isDebugEnabled()) {
+        for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
+          LOG.debug(tk.getService() + " = " + tk.toString());
         }
-      } catch (IOException e) {
-        throw RPCUtil.getRemoteException(e);
       }
     }
     // //////////// End of parsing credentials
@@ -724,7 +928,7 @@ public class ContainerManagerImpl extend
 
   @SuppressWarnings("unchecked")
   private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
-      ContainerId containerID) throws YarnException {
+      ContainerId containerID) throws YarnException, IOException {
     String containerIDStr = containerID.toString();
     Container container = this.context.getContainers().get(containerID);
     LOG.info("Stopping container with container Id: " + containerIDStr);
@@ -737,6 +941,7 @@ public class ContainerManagerImpl extend
           + " is not handled by this NodeManager");
       }
     } else {
+      context.getNMStateStore().storeContainerKilled(containerID);
       dispatcher.getEventHandler().handle(
         new ContainerKillEvent(containerID,
             ContainerExitStatus.KILLED_BY_APPMASTER,
@@ -875,6 +1080,11 @@ public class ContainerManagerImpl extend
         } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
           diagnostic = "Application killed by ResourceManager";
         }
+        try {
+          this.context.getNMStateStore().storeFinishedApplication(appID);
+        } catch (IOException e) {
+          LOG.error("Unable to update application state in store", e);
+        }
         this.dispatcher.getEventHandler().handle(
             new ApplicationFinishEvent(appID,
                 diagnostic));

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Wed Aug 20 01:34:29 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
+import java.io.IOException;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
@@ -428,6 +429,11 @@ public class ApplicationImpl implements 
       ApplicationId appId = event.getApplicationID();
       app.context.getApplications().remove(appId);
       app.aclsManager.removeApplication(appId);
+      try {
+        app.context.getNMStateStore().removeApplication(appId);
+      } catch (IOException e) {
+        LOG.error("Unable to remove application from state store", e);
+      }
     }
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Wed Aug 20 01:34:29 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
+import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -62,6 +63,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -75,6 +78,7 @@ public class ContainerImpl implements Co
   private final Lock readLock;
   private final Lock writeLock;
   private final Dispatcher dispatcher;
+  private final NMStateStoreService stateStore;
   private final Credentials credentials;
   private final NodeManagerMetrics metrics;
   private final ContainerLaunchContext launchContext;
@@ -101,12 +105,19 @@ public class ContainerImpl implements Co
   private final List<LocalResourceRequest> appRsrcs =
     new ArrayList<LocalResourceRequest>();
 
+  // whether container has been recovered after a restart
+  private RecoveredContainerStatus recoveredStatus =
+      RecoveredContainerStatus.REQUESTED;
+  // whether container was marked as killed after recovery
+  private boolean recoveredAsKilled = false;
+
   public ContainerImpl(Configuration conf, Dispatcher dispatcher,
-      ContainerLaunchContext launchContext, Credentials creds,
-      NodeManagerMetrics metrics,
+      NMStateStoreService stateStore, ContainerLaunchContext launchContext,
+      Credentials creds, NodeManagerMetrics metrics,
       ContainerTokenIdentifier containerTokenIdentifier) {
     this.daemonConf = conf;
     this.dispatcher = dispatcher;
+    this.stateStore = stateStore;
     this.launchContext = launchContext;
     this.containerTokenIdentifier = containerTokenIdentifier;
     this.containerId = containerTokenIdentifier.getContainerID();
@@ -122,6 +133,21 @@ public class ContainerImpl implements Co
     stateMachine = stateMachineFactory.make(this);
   }
 
+  // constructor for a recovered container
+  public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+      NMStateStoreService stateStore, ContainerLaunchContext launchContext,
+      Credentials creds, NodeManagerMetrics metrics,
+      ContainerTokenIdentifier containerTokenIdentifier,
+      RecoveredContainerStatus recoveredStatus, int exitCode,
+      String diagnostics, boolean wasKilled) {
+    this(conf, dispatcher, stateStore, launchContext, creds, metrics,
+        containerTokenIdentifier);
+    this.recoveredStatus = recoveredStatus;
+    this.exitCode = exitCode;
+    this.recoveredAsKilled = wasKilled;
+    this.diagnostics.append(diagnostics);
+  }
+
   private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
     new ContainerDoneTransition();
 
@@ -135,8 +161,10 @@ public class ContainerImpl implements Co
       new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
     // From NEW State
     .addTransition(ContainerState.NEW,
-        EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED,
-            ContainerState.LOCALIZATION_FAILED),
+        EnumSet.of(ContainerState.LOCALIZING,
+            ContainerState.LOCALIZED,
+            ContainerState.LOCALIZATION_FAILED,
+            ContainerState.DONE),
         ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
     .addTransition(ContainerState.NEW, ContainerState.NEW,
         ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@@ -281,7 +309,9 @@ public class ContainerImpl implements Co
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
         ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
-        ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
 
     // From DONE
     .addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -295,7 +325,9 @@ public class ContainerImpl implements Co
     // we notify container of failed localization if localizer thread (for
     // that container) fails for some reason
     .addTransition(ContainerState.DONE, ContainerState.DONE,
-        ContainerEventType.RESOURCE_FAILED)
+        EnumSet.of(ContainerEventType.RESOURCE_FAILED,
+            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
 
     // create the topology tables
     .installTopology();
@@ -420,7 +452,7 @@ public class ContainerImpl implements Co
     }
   }
 
-  @SuppressWarnings({"fallthrough", "unchecked"})
+  @SuppressWarnings("fallthrough")
   private void finished() {
     ApplicationId applicationId =
         containerId.getApplicationAttemptId().getApplicationId();
@@ -458,7 +490,11 @@ public class ContainerImpl implements Co
     }
 
     metrics.releaseContainer(this.resource);
+    sendFinishedEvents();
+  }
 
+  @SuppressWarnings("unchecked")
+  private void sendFinishedEvents() {
     // Inform the application
     @SuppressWarnings("rawtypes")
     EventHandler eventHandler = dispatcher.getEventHandler();
@@ -471,6 +507,45 @@ public class ContainerImpl implements Co
   }
 
   @SuppressWarnings("unchecked") // dispatcher not typed
+  private void sendLaunchEvent() {
+    ContainersLauncherEventType launcherEvent =
+        ContainersLauncherEventType.LAUNCH_CONTAINER;
+    if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
+      // try to recover a container that was previously launched
+      launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+    }
+    dispatcher.getEventHandler().handle(
+        new ContainersLauncherEvent(this, launcherEvent));
+  }
+
+  // Inform the ContainersMonitor to start monitoring the container's
+  // resource usage.
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  private void sendContainerMonitorStartEvent() {
+      long pmemBytes = getResource().getMemory() * 1024 * 1024L;
+      float pmemRatio = daemonConf.getFloat(
+          YarnConfiguration.NM_VMEM_PMEM_RATIO,
+          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+      long vmemBytes = (long) (pmemRatio * pmemBytes);
+
+      dispatcher.getEventHandler().handle(
+          new ContainerStartMonitoringEvent(containerId,
+              vmemBytes, pmemBytes));
+  }
+
+  private void addDiagnostics(String... diags) {
+    for (String s : diags) {
+      this.diagnostics.append(s);
+    }
+    try {
+      stateStore.storeContainerDiagnostics(containerId, diagnostics);
+    } catch (IOException e) {
+      LOG.warn("Unable to update diagnostics in state store for "
+          + containerId, e);
+    }
+  }
+
+  @SuppressWarnings("unchecked") // dispatcher not typed
   public void cleanup() {
     Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
       new HashMap<LocalResourceVisibility, 
@@ -518,6 +593,16 @@ public class ContainerImpl implements Co
     @Override
     public ContainerState transition(ContainerImpl container,
         ContainerEvent event) {
+      if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
+        container.sendFinishedEvents();
+        return ContainerState.DONE;
+      } else if (container.recoveredAsKilled &&
+          container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
+        // container was killed but never launched
+        container.finished();
+        return ContainerState.DONE;
+      }
+
       final ContainerLaunchContext ctxt = container.launchContext;
       container.metrics.initingContainer();
 
@@ -593,9 +678,7 @@ public class ContainerImpl implements Co
               new ContainerLocalizationRequestEvent(container, req));
         return ContainerState.LOCALIZING;
       } else {
-        container.dispatcher.getEventHandler().handle(
-            new ContainersLauncherEvent(container,
-                ContainersLauncherEventType.LAUNCH_CONTAINER));
+        container.sendLaunchEvent();
         container.metrics.endInitingContainer();
         return ContainerState.LOCALIZED;
       }
@@ -606,7 +689,6 @@ public class ContainerImpl implements Co
    * Transition when one of the requested resources for this container
    * has been successfully localized.
    */
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class LocalizedTransition implements
       MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
     @Override
@@ -626,9 +708,8 @@ public class ContainerImpl implements Co
       if (!container.pendingResources.isEmpty()) {
         return ContainerState.LOCALIZING;
       }
-      container.dispatcher.getEventHandler().handle(
-          new ContainersLauncherEvent(container,
-              ContainersLauncherEventType.LAUNCH_CONTAINER));
+
+      container.sendLaunchEvent();
       container.metrics.endInitingContainer();
       return ContainerState.LOCALIZED;
     }
@@ -638,24 +719,22 @@ public class ContainerImpl implements Co
    * Transition from LOCALIZED state to RUNNING state upon receiving
    * a CONTAINER_LAUNCHED event
    */
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class LaunchTransition extends ContainerTransition {
+    @SuppressWarnings("unchecked")
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
-      // Inform the ContainersMonitor to start monitoring the container's
-      // resource usage.
-      long pmemBytes =
-          container.getResource().getMemory() * 1024 * 1024L;
-      float pmemRatio = container.daemonConf.getFloat(
-          YarnConfiguration.NM_VMEM_PMEM_RATIO,
-          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
-      long vmemBytes = (long) (pmemRatio * pmemBytes);
-      
-      container.dispatcher.getEventHandler().handle(
-          new ContainerStartMonitoringEvent(container.containerId,
-              vmemBytes, pmemBytes));
+      container.sendContainerMonitorStartEvent();
       container.metrics.runningContainer();
       container.wasLaunched  = true;
+
+      if (container.recoveredAsKilled) {
+        LOG.info("Killing " + container.containerId
+            + " due to recovered as killed");
+        container.addDiagnostics("Container recovered as killed.\n");
+        container.dispatcher.getEventHandler().handle(
+            new ContainersLauncherEvent(container,
+                ContainersLauncherEventType.CLEANUP_CONTAINER));
+      }
     }
   }
 
@@ -707,8 +786,7 @@ public class ContainerImpl implements Co
       ContainerExitEvent exitEvent = (ContainerExitEvent) event;
       container.exitCode = exitEvent.getExitCode();
       if (exitEvent.getDiagnosticInfo() != null) {
-        container.diagnostics.append(exitEvent.getDiagnosticInfo())
-          .append('\n');
+        container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
       }
 
       // TODO: Add containerWorkDir to the deletion service.
@@ -735,7 +813,7 @@ public class ContainerImpl implements Co
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       super.transition(container, event);
-      container.diagnostics.append("Killed by external signal\n");
+      container.addDiagnostics("Killed by external signal\n");
     }
   }
 
@@ -750,9 +828,7 @@ public class ContainerImpl implements Co
 
       ContainerResourceFailedEvent rsrcFailedEvent =
           (ContainerResourceFailedEvent) event;
-      container.diagnostics.append(rsrcFailedEvent.getDiagnosticMessage()
-          + "\n");
-          
+      container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage(), "\n");
 
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
@@ -775,8 +851,8 @@ public class ContainerImpl implements Co
       container.metrics.endInitingContainer();
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
       container.exitCode = killEvent.getContainerExitStatus();
-      container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
-      container.diagnostics.append("Container is killed before being launched.\n");
+      container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+      container.addDiagnostics("Container is killed before being launched.\n");
     }
   }
 
@@ -817,7 +893,7 @@ public class ContainerImpl implements Co
           new ContainersLauncherEvent(container,
               ContainersLauncherEventType.CLEANUP_CONTAINER));
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
-      container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
+      container.addDiagnostics(killEvent.getDiagnostic(), "\n");
       container.exitCode = killEvent.getContainerExitStatus();
     }
   }
@@ -836,8 +912,7 @@ public class ContainerImpl implements Co
       }
 
       if (exitEvent.getDiagnosticInfo() != null) {
-        container.diagnostics.append(exitEvent.getDiagnosticInfo())
-          .append('\n');
+        container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
       }
 
       // The process/process-grp is killed. Decrement reference counts and
@@ -877,8 +952,8 @@ public class ContainerImpl implements Co
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
       container.exitCode = killEvent.getContainerExitStatus();
-      container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
-      container.diagnostics.append("Container is killed before being launched.\n");
+      container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+      container.addDiagnostics("Container is killed before being launched.\n");
       super.transition(container, event);
     }
   }
@@ -892,8 +967,14 @@ public class ContainerImpl implements Co
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerDiagnosticsUpdateEvent updateEvent =
           (ContainerDiagnosticsUpdateEvent) event;
-      container.diagnostics.append(updateEvent.getDiagnosticsUpdate())
-          .append("\n");
+      container.addDiagnostics(updateEvent.getDiagnosticsUpdate(), "\n");
+      try {
+        container.stateStore.storeContainerDiagnostics(container.containerId,
+            container.diagnostics);
+      } catch (IOException e) {
+        LOG.warn("Unable to update state store diagnostics for "
+            + container.containerId, e);
+      }
     }
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Wed Aug 20 01:34:29 2014
@@ -87,22 +87,23 @@ public class ContainerLaunch implements 
   public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
 
   private static final String PID_FILE_NAME_FMT = "%s.pid";
+  private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";
 
-  private final Dispatcher dispatcher;
-  private final ContainerExecutor exec;
+  protected final Dispatcher dispatcher;
+  protected final ContainerExecutor exec;
   private final Application app;
-  private final Container container;
+  protected final Container container;
   private final Configuration conf;
   private final Context context;
   private final ContainerManagerImpl containerManager;
   
-  private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
-  private volatile AtomicBoolean completed = new AtomicBoolean(false);
+  protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
+  protected AtomicBoolean completed = new AtomicBoolean(false);
 
   private long sleepDelayBeforeSigKill = 250;
   private long maxKillWaitTime = 2000;
 
-  private Path pidFilePath = null;
+  protected Path pidFilePath = null;
 
   private final LocalDirsHandlerService dirsHandler;
 
@@ -223,14 +224,11 @@ public class ContainerLaunch implements 
               + Path.SEPARATOR + containerIdStr,
               LocalDirAllocator.SIZE_UNKNOWN, false);
 
-      String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
-          containerIdStr);
+      String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
 
       // pid file should be in nm private dir so that it is not 
       // accessible by users
-      pidFilePath = dirsHandler.getLocalPathForWrite(
-          ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR 
-          + pidFileSuffix);
+      pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
       List<String> localDirs = dirsHandler.getLocalDirs();
       List<String> logDirs = dirsHandler.getLogDirs();
 
@@ -288,6 +286,7 @@ public class ContainerLaunch implements 
       dispatcher.getEventHandler().handle(new ContainerEvent(
             containerID,
             ContainerEventType.CONTAINER_LAUNCHED));
+      context.getNMStateStore().storeContainerLaunched(containerID);
 
       // Check if the container is signalled to be killed.
       if (!shouldLaunchContainer.compareAndSet(false, true)) {
@@ -310,6 +309,11 @@ public class ContainerLaunch implements 
     } finally {
       completed.set(true);
       exec.deactivateContainer(containerID);
+      try {
+        context.getNMStateStore().storeContainerCompleted(containerID, ret);
+      } catch (IOException e) {
+        LOG.error("Unable to set exit code for container " + containerID);
+      }
     }
 
     if (LOG.isDebugEnabled()) {
@@ -342,6 +346,11 @@ public class ContainerLaunch implements 
             ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
     return 0;
   }
+
+  protected String getPidFileSubpath(String appIdStr, String containerIdStr) {
+    return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+        + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
+  }
   
   /**
    * Cleanup the container.
@@ -357,6 +366,13 @@ public class ContainerLaunch implements 
     String containerIdStr = ConverterUtils.toString(containerId);
     LOG.info("Cleaning up container " + containerIdStr);
 
+    try {
+      context.getNMStateStore().storeContainerKilled(containerId);
+    } catch (IOException e) {
+      LOG.error("Unable to mark container " + containerId
+          + " killed in store", e);
+    }
+
     // launch flag will be set to true if process already launched
     boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
     if (!alreadyLaunched) {
@@ -421,6 +437,7 @@ public class ContainerLaunch implements 
       if (pidFilePath != null) {
         FileContext lfs = FileContext.getLocalFSFileContext();
         lfs.delete(pidFilePath, false);
+        lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
       }
     }
   }
@@ -479,6 +496,10 @@ public class ContainerLaunch implements 
         + appIdStr;
   }
 
+  Context getContext() {
+    return context;
+  }
+
   @VisibleForTesting
   static abstract class ShellScriptBuilder {
     public static ShellScriptBuilder create() {
@@ -787,4 +808,7 @@ public class ContainerLaunch implements 
     }
   }
 
+  public static String getExitCodeFile(String pidFile) {
+    return pidFile + EXIT_CODE_FILE_SUFFIX;
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Wed Aug 20 01:34:29 2014
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,21 +31,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -107,7 +101,6 @@ public class ContainersLauncher extends 
     super.serviceStop();
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public void handle(ContainersLauncherEvent event) {
     // TODO: ContainersLauncher launches containers one by one!!
@@ -125,6 +118,14 @@ public class ContainersLauncher extends 
         containerLauncher.submit(launch);
         running.put(containerId, launch);
         break;
+      case RECOVER_CONTAINER:
+        app = context.getApplications().get(
+            containerId.getApplicationAttemptId().getApplicationId());
+        launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher,
+            exec, app, event.getContainer(), dirsHandler, containerManager);
+        containerLauncher.submit(launch);
+        running.put(containerId, launch);
+        break;
       case CLEANUP_CONTAINER:
         ContainerLaunch launcher = running.remove(containerId);
         if (launcher == null) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java Wed Aug 20 01:34:29 2014
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no
 
 public enum ContainersLauncherEventType {
   LAUNCH_CONTAINER,
+  RECOVER_CONTAINER,
   CLEANUP_CONTAINER, // The process(grp) itself.
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Aug 20 01:34:29 2014
@@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
@@ -251,6 +252,7 @@ public class ResourceLocalizationService
     cacheCleanupPeriod =
       conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
     localizationServerAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
         YarnConfiguration.NM_LOCALIZER_ADDRESS,
         YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
         YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
@@ -341,7 +343,9 @@ public class ResourceLocalizationService
     server = createServer();
     server.start();
     localizationServerAddress =
-        getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS,
+        getConfig().updateConnectAddr(YarnConfiguration.NM_BIND_HOST,
+                                      YarnConfiguration.NM_LOCALIZER_ADDRESS,
+                                      YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
                                       server.getListenerAddress());
     LOG.info("Localizer started on port " + server.getPort());
     super.serviceStart();

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java Wed Aug 20 01:34:29 2014
@@ -25,5 +25,7 @@ public interface AppLogAggregator extend
   void startContainerLogAggregation(ContainerId containerId,
       boolean wasContainerSuccessful);
 
+  void abortLogAggregation();
+
   void finishLogAggregation();
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Wed Aug 20 01:34:29 2014
@@ -70,6 +70,7 @@ public class AppLogAggregatorImpl implem
   private final BlockingQueue<ContainerId> pendingContainers;
   private final AtomicBoolean appFinishing = new AtomicBoolean();
   private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
+  private final AtomicBoolean aborted = new AtomicBoolean();
   private final Map<ApplicationAccessType, String> appAcls;
 
   private LogWriter writer = null;
@@ -150,7 +151,7 @@ public class AppLogAggregatorImpl implem
   private void doAppLogAggregation() {
     ContainerId containerId;
 
-    while (!this.appFinishing.get()) {
+    while (!this.appFinishing.get() && !this.aborted.get()) {
       synchronized(this) {
         try {
           wait(THREAD_SLEEP_TIME);
@@ -161,6 +162,10 @@ public class AppLogAggregatorImpl implem
       }
     }
 
+    if (this.aborted.get()) {
+      return;
+    }
+
     // Application is finished. Finish pending-containers
     while ((containerId = this.pendingContainers.poll()) != null) {
       uploadLogsForContainer(containerId);
@@ -255,4 +260,11 @@ public class AppLogAggregatorImpl implem
     this.appFinishing.set(true);
     this.notifyAll();
   }
+
+  @Override
+  public synchronized void abortLogAggregation() {
+    LOG.info("Aborting log aggregation for " + this.applicationId);
+    this.aborted.set(true);
+    this.notifyAll();
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Wed Aug 20 01:34:29 2014
@@ -142,9 +142,17 @@ public class LogAggregationService exten
    
   private void stopAggregators() {
     threadPool.shutdown();
+    // if recovery on restart is supported then leave outstanding aggregations
+    // to the next restart
+    boolean shouldAbort = context.getNMStateStore().canRecover()
+        && !context.getDecommissioned();
     // politely ask to finish
     for (AppLogAggregator aggregator : appLogAggregators.values()) {
-      aggregator.finishLogAggregation();
+      if (shouldAbort) {
+        aggregator.abortLogAggregation();
+      } else {
+        aggregator.finishLogAggregation();
+      }
     }
     while (!threadPool.isTerminated()) { // wait for all threads to finish
       for (ApplicationId appId : appLogAggregators.keySet()) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Wed Aug 20 01:34:29 2014
@@ -35,19 +35,23 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
-import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.fusesource.leveldbjni.JniDBFactory;
@@ -68,12 +72,17 @@ public class NMLeveldbStateStoreService 
   private static final String DB_NAME = "yarn-nm-state";
   private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
   
-  private static final NMDBSchemaVersion CURRENT_VERSION_INFO = NMDBSchemaVersion
+  private static final Version CURRENT_VERSION_INFO = Version
       .newInstance(1, 0);
 
   private static final String DELETION_TASK_KEY_PREFIX =
       "DeletionService/deltask_";
 
+  private static final String APPLICATIONS_KEY_PREFIX =
+      "ContainerManager/applications/";
+  private static final String FINISHED_APPS_KEY_PREFIX =
+      "ContainerManager/finishedApps/";
+
   private static final String LOCALIZATION_KEY_PREFIX = "Localization/";
   private static final String LOCALIZATION_PUBLIC_KEY_PREFIX =
       LOCALIZATION_KEY_PREFIX + "public/";
@@ -84,6 +93,14 @@ public class NMLeveldbStateStoreService 
   private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/";
   private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/";
 
+  private static final String CONTAINERS_KEY_PREFIX =
+      "ContainerManager/containers/";
+  private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
+  private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
+  private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
+  private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
+  private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
+
   private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
   private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
   private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/";
@@ -98,6 +115,8 @@ public class NMLeveldbStateStoreService 
   private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
       CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
 
+  private static final byte[] EMPTY_VALUE = new byte[0];
+
   private DB db;
 
   public NMLeveldbStateStoreService() {
@@ -117,6 +136,246 @@ public class NMLeveldbStateStoreService 
 
 
   @Override
+  public List<RecoveredContainerState> loadContainersState()
+      throws IOException {
+    ArrayList<RecoveredContainerState> containers =
+        new ArrayList<RecoveredContainerState>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(CONTAINERS_KEY_PREFIX));
+
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.peekNext();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(CONTAINERS_KEY_PREFIX)) {
+          break;
+        }
+
+        int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length());
+        if (idEndPos < 0) {
+          throw new IOException("Unable to determine container in key: " + key);
+        }
+        ContainerId containerId = ConverterUtils.toContainerId(
+            key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos));
+        String keyPrefix = key.substring(0, idEndPos+1);
+        containers.add(loadContainerState(containerId, iter, keyPrefix));
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+
+    return containers;
+  }
+
+  private RecoveredContainerState loadContainerState(ContainerId containerId,
+      LeveldbIterator iter, String keyPrefix) throws IOException {
+    RecoveredContainerState rcs = new RecoveredContainerState();
+    rcs.status = RecoveredContainerStatus.REQUESTED;
+    while (iter.hasNext()) {
+      Entry<byte[],byte[]> entry = iter.peekNext();
+      String key = asString(entry.getKey());
+      if (!key.startsWith(keyPrefix)) {
+        break;
+      }
+      iter.next();
+
+      String suffix = key.substring(keyPrefix.length()-1);  // start with '/'
+      if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) {
+        rcs.startRequest = new StartContainerRequestPBImpl(
+            StartContainerRequestProto.parseFrom(entry.getValue()));
+      } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
+        rcs.diagnostics = asString(entry.getValue());
+      } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+        if (rcs.status == RecoveredContainerStatus.REQUESTED) {
+          rcs.status = RecoveredContainerStatus.LAUNCHED;
+        }
+      } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
+        rcs.killed = true;
+      } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) {
+        rcs.status = RecoveredContainerStatus.COMPLETED;
+        rcs.exitCode = Integer.parseInt(asString(entry.getValue()));
+      } else {
+        throw new IOException("Unexpected container state key: " + key);
+      }
+    }
+    return rcs;
+  }
+
+  @Override
+  public void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_REQUEST_KEY_SUFFIX;
+    try {
+      db.put(bytes(key),
+        ((StartContainerRequestPBImpl) startRequest).getProto().toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_DIAGS_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), bytes(diagnostics.toString()));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerLaunched(ContainerId containerId)
+      throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_LAUNCHED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerKilled(ContainerId containerId)
+      throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_KILLED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeContainerCompleted(ContainerId containerId,
+      int exitCode) throws IOException {
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_EXIT_CODE_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), bytes(Integer.toString(exitCode)));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeContainer(ContainerId containerId)
+      throws IOException {
+    String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString();
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+
+  @Override
+  public RecoveredApplicationsState loadApplicationsState()
+      throws IOException {
+    RecoveredApplicationsState state = new RecoveredApplicationsState();
+    state.applications = new ArrayList<ContainerManagerApplicationProto>();
+    String keyPrefix = APPLICATIONS_KEY_PREFIX;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(keyPrefix));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(keyPrefix)) {
+          break;
+        }
+        state.applications.add(
+            ContainerManagerApplicationProto.parseFrom(entry.getValue()));
+      }
+
+      state.finishedApplications = new ArrayList<ApplicationId>();
+      keyPrefix = FINISHED_APPS_KEY_PREFIX;
+      iter.seek(bytes(keyPrefix));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(keyPrefix)) {
+          break;
+        }
+        ApplicationId appId =
+            ConverterUtils.toApplicationId(key.substring(keyPrefix.length()));
+        state.finishedApplications.add(appId);
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+
+    return state;
+  }
+
+  @Override
+  public void storeApplication(ApplicationId appId,
+      ContainerManagerApplicationProto p) throws IOException {
+    String key = APPLICATIONS_KEY_PREFIX + appId;
+    try {
+      db.put(bytes(key), p.toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeFinishedApplication(ApplicationId appId)
+      throws IOException {
+    String key = FINISHED_APPS_KEY_PREFIX + appId;
+    try {
+      db.put(bytes(key), new byte[0]);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeApplication(ApplicationId appId)
+      throws IOException {
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        String key = APPLICATIONS_KEY_PREFIX + appId;
+        batch.delete(bytes(key));
+        key = FINISHED_APPS_KEY_PREFIX + appId;
+        batch.delete(bytes(key));
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+
+  @Override
   public RecoveredLocalizationState loadLocalizationState()
       throws IOException {
     RecoveredLocalizationState state = new RecoveredLocalizationState();
@@ -617,14 +876,14 @@ public class NMLeveldbStateStoreService 
   }
 
 
-  NMDBSchemaVersion loadVersion() throws IOException {
+  Version loadVersion() throws IOException {
     byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
     // if version is not stored previously, treat it as 1.0.
     if (data == null || data.length == 0) {
-      return NMDBSchemaVersion.newInstance(1, 0);
+      return Version.newInstance(1, 0);
     }
-    NMDBSchemaVersion version =
-        new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
+    Version version =
+        new VersionPBImpl(VersionProto.parseFrom(data));
     return version;
   }
 
@@ -634,14 +893,14 @@ public class NMLeveldbStateStoreService 
   
   // Only used for test
   @VisibleForTesting
-  void storeVersion(NMDBSchemaVersion state) throws IOException {
+  void storeVersion(Version state) throws IOException {
     dbStoreVersion(state);
   }
   
-  private void dbStoreVersion(NMDBSchemaVersion state) throws IOException {
+  private void dbStoreVersion(Version state) throws IOException {
     String key = DB_SCHEMA_VERSION_KEY;
     byte[] data = 
-        ((NMDBSchemaVersionPBImpl) state).getProto().toByteArray();
+        ((VersionPBImpl) state).getProto().toByteArray();
     try {
       db.put(bytes(key), data);
     } catch (DBException e) {
@@ -649,7 +908,7 @@ public class NMLeveldbStateStoreService 
     }
   }
 
-  NMDBSchemaVersion getCurrentVersion() {
+  Version getCurrentVersion() {
     return CURRENT_VERSION_INFO;
   }
   
@@ -664,9 +923,9 @@ public class NMLeveldbStateStoreService 
    *    upgrade NM state or remove incompatible old state.
    */
   private void checkVersion() throws IOException {
-    NMDBSchemaVersion loadedVersion = loadVersion();
+    Version loadedVersion = loadVersion();
     LOG.info("Loaded NM state version info " + loadedVersion);
-    if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+    if (loadedVersion.equals(getCurrentVersion())) {
       return;
     }
     if (loadedVersion.isCompatibleTo(getCurrentVersion())) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Wed Aug 20 01:34:29 2014
@@ -19,13 +19,16 @@
 package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -43,6 +46,61 @@ public class NMNullStateStoreService ext
   }
 
   @Override
+  public RecoveredApplicationsState loadApplicationsState() throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeApplication(ApplicationId appId,
+      ContainerManagerApplicationProto p) throws IOException {
+  }
+
+  @Override
+  public void storeFinishedApplication(ApplicationId appId) {
+  }
+
+  @Override
+  public void removeApplication(ApplicationId appId) throws IOException {
+  }
+
+  @Override
+  public List<RecoveredContainerState> loadContainersState()
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException {
+  }
+
+  @Override
+  public void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException {
+  }
+
+  @Override
+  public void storeContainerLaunched(ContainerId containerId)
+      throws IOException {
+  }
+
+  @Override
+  public void storeContainerKilled(ContainerId containerId)
+      throws IOException {
+  }
+
+  @Override
+  public void storeContainerCompleted(ContainerId containerId, int exitCode)
+      throws IOException {
+  }
+
+  @Override
+  public void removeContainer(ContainerId containerId) throws IOException {
+  }
+
+  @Override
   public RecoveredLocalizationState loadLocalizationState()
       throws IOException {
     throw new UnsupportedOperationException(

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Wed Aug 20 01:34:29 2014
@@ -29,10 +29,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -45,6 +48,53 @@ public abstract class NMStateStoreServic
     super(name);
   }
 
+  public static class RecoveredApplicationsState {
+    List<ContainerManagerApplicationProto> applications;
+    List<ApplicationId> finishedApplications;
+
+    public List<ContainerManagerApplicationProto> getApplications() {
+      return applications;
+    }
+
+    public List<ApplicationId> getFinishedApplications() {
+      return finishedApplications;
+    }
+  }
+
+  public enum RecoveredContainerStatus {
+    REQUESTED,
+    LAUNCHED,
+    COMPLETED
+  }
+
+  public static class RecoveredContainerState {
+    RecoveredContainerStatus status;
+    int exitCode = ContainerExitStatus.INVALID;
+    boolean killed = false;
+    String diagnostics = "";
+    StartContainerRequest startRequest;
+
+    public RecoveredContainerStatus getStatus() {
+      return status;
+    }
+
+    public int getExitCode() {
+      return exitCode;
+    }
+
+    public boolean getKilled() {
+      return killed;
+    }
+
+    public String getDiagnostics() {
+      return diagnostics;
+    }
+
+    public StartContainerRequest getStartRequest() {
+      return startRequest;
+    }
+  }
+
   public static class LocalResourceTrackerState {
     List<LocalizedResourceProto> localizedResources =
         new ArrayList<LocalizedResourceProto>();
@@ -163,6 +213,100 @@ public abstract class NMStateStoreServic
 
 
   /**
+   * Load the state of applications
+   * @return recovered state for applications
+   * @throws IOException
+   */
+  public abstract RecoveredApplicationsState loadApplicationsState()
+      throws IOException;
+
+  /**
+   * Record the start of an application
+   * @param appId the application ID
+   * @param p state to store for the application
+   * @throws IOException
+   */
+  public abstract void storeApplication(ApplicationId appId,
+      ContainerManagerApplicationProto p) throws IOException;
+
+  /**
+   * Record that an application has finished
+   * @param appId the application ID
+   * @throws IOException
+   */
+  public abstract void storeFinishedApplication(ApplicationId appId)
+      throws IOException;
+
+  /**
+   * Remove records corresponding to an application
+   * @param appId the application ID
+   * @throws IOException
+   */
+  public abstract void removeApplication(ApplicationId appId)
+      throws IOException;
+
+
+  /**
+   * Load the state of containers
+   * @return recovered state for containers
+   * @throws IOException
+   */
+  public abstract List<RecoveredContainerState> loadContainersState()
+      throws IOException;
+
+  /**
+   * Record a container start request
+   * @param containerId the container ID
+   * @param startRequest the container start request
+   * @throws IOException
+   */
+  public abstract void storeContainer(ContainerId containerId,
+      StartContainerRequest startRequest) throws IOException;
+
+  /**
+   * Record that a container has been launched
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void storeContainerLaunched(ContainerId containerId)
+      throws IOException;
+
+  /**
+   * Record that a container has completed
+   * @param containerId the container ID
+   * @param exitCode the exit code from the container
+   * @throws IOException
+   */
+  public abstract void storeContainerCompleted(ContainerId containerId,
+      int exitCode) throws IOException;
+
+  /**
+   * Record a request to kill a container
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void storeContainerKilled(ContainerId containerId)
+      throws IOException;
+
+  /**
+   * Record diagnostics for a container
+   * @param containerId the container ID
+   * @param diagnostics the container diagnostics
+   * @throws IOException
+   */
+  public abstract void storeContainerDiagnostics(ContainerId containerId,
+      StringBuilder diagnostics) throws IOException;
+
+  /**
+   * Remove records corresponding to a container
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void removeContainer(ContainerId containerId)
+      throws IOException;
+
+
+  /**
    * Load the state of localized resources
    * @return recovered localized resource state
    * @throws IOException
@@ -203,43 +347,111 @@ public abstract class NMStateStoreServic
       ApplicationId appId, Path localPath) throws IOException;
 
 
+  /**
+   * Load the state of the deletion service
+   * @return recovered deletion service state
+   * @throws IOException
+   */
   public abstract RecoveredDeletionServiceState loadDeletionServiceState()
       throws IOException;
 
+  /**
+   * Record a deletion task
+   * @param taskId the deletion task ID
+   * @param taskProto the deletion task protobuf
+   * @throws IOException
+   */
   public abstract void storeDeletionTask(int taskId,
       DeletionServiceDeleteTaskProto taskProto) throws IOException;
 
+  /**
+   * Remove records corresponding to a deletion task
+   * @param taskId the deletion task ID
+   * @throws IOException
+   */
   public abstract void removeDeletionTask(int taskId) throws IOException;
 
 
+  /**
+   * Load the state of NM tokens
+   * @return recovered state of NM tokens
+   * @throws IOException
+   */
   public abstract RecoveredNMTokensState loadNMTokensState()
       throws IOException;
 
+  /**
+   * Record the current NM token master key
+   * @param key the master key
+   * @throws IOException
+   */
   public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record the previous NM token master key
+   * @param key the previous master key
+   * @throws IOException
+   */
   public abstract void storeNMTokenPreviousMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record a master key corresponding to an application
+   * @param attempt the application attempt ID
+   * @param key the master key
+   * @throws IOException
+   */
   public abstract void storeNMTokenApplicationMasterKey(
       ApplicationAttemptId attempt, MasterKey key) throws IOException;
 
+  /**
+   * Remove a master key corresponding to an application
+   * @param attempt the application attempt ID
+   * @throws IOException
+   */
   public abstract void removeNMTokenApplicationMasterKey(
       ApplicationAttemptId attempt) throws IOException;
 
 
+  /**
+   * Load the state of container tokens
+   * @return recovered state of container tokens
+   * @throws IOException
+   */
   public abstract RecoveredContainerTokensState loadContainerTokensState()
       throws IOException;
 
+  /**
+   * Record the current container token master key
+   * @param key the master key
+   * @throws IOException
+   */
   public abstract void storeContainerTokenCurrentMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record the previous container token master key
+   * @param key the previous master key
+   * @throws IOException
+   */
   public abstract void storeContainerTokenPreviousMasterKey(MasterKey key)
       throws IOException;
 
+  /**
+   * Record the expiration time for a container token
+   * @param containerId the container ID
+   * @param expirationTime the container token expiration time
+   * @throws IOException
+   */
   public abstract void storeContainerToken(ContainerId containerId,
       Long expirationTime) throws IOException;
 
+  /**
+   * Remove records for a container token
+   * @param containerId the container ID
+   * @throws IOException
+   */
   public abstract void removeContainerToken(ContainerId containerId)
       throws IOException;
 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java Wed Aug 20 01:34:29 2014
@@ -55,7 +55,9 @@ public class WebServer extends AbstractS
 
   @Override
   protected void serviceStart() throws Exception {
-    String bindAddress = WebAppUtils.getNMWebAppURLWithoutScheme(getConfig());
+    String bindAddress = WebAppUtils.getWebAppBindURL(getConfig(),
+                          YarnConfiguration.NM_BIND_HOST,
+                          WebAppUtils.getNMWebAppURLWithoutScheme(getConfig()));
     
     LOG.info("Instantiating NMWebApp at " + bindAddress);
     try {