You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [11/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -32,9 +32,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -43,6 +44,8 @@ import com.google.common.annotations.Vis
 public class MemoryRMStateStore extends RMStateStore {
   
   RMState state = new RMState();
+  private int epoch = 0;
+  
   @VisibleForTesting
   public RMState getState() {
     return state;
@@ -53,6 +56,13 @@ public class MemoryRMStateStore extends 
   }
 
   @Override
+  public synchronized int getAndIncrementEpoch() throws Exception {
+    int currentEpoch = epoch;
+    epoch = epoch + 1;
+    return currentEpoch;
+  }
+
+  @Override
   public synchronized RMState loadState() throws Exception {
     // return a copy of the state to allow for modification of the real state
     RMState returnState = new RMState();
@@ -63,6 +73,10 @@ public class MemoryRMStateStore extends 
       state.rmSecretManagerState.getTokenState());
     returnState.rmSecretManagerState.dtSequenceNumber =
         state.rmSecretManagerState.dtSequenceNumber;
+    returnState.amrmTokenSecretManagerState =
+        state.amrmTokenSecretManagerState == null ? null
+            : AMRMTokenSecretManagerState
+              .newInstance(state.amrmTokenSecretManagerState);
     return returnState;
   }
   
@@ -80,7 +94,7 @@ public class MemoryRMStateStore extends 
 
   @Override
   public void storeApplicationStateInternal(ApplicationId appId,
-                                     ApplicationStateDataPBImpl appStateData)
+                                     ApplicationStateData appStateData)
       throws Exception {
     ApplicationState appState =
         new ApplicationState(appStateData.getSubmitTime(),
@@ -92,7 +106,7 @@ public class MemoryRMStateStore extends 
 
   @Override
   public void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception {
+      ApplicationStateData appStateData) throws Exception {
     ApplicationState updatedAppState =
         new ApplicationState(appStateData.getSubmitTime(),
           appStateData.getStartTime(),
@@ -112,7 +126,7 @@ public class MemoryRMStateStore extends 
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData)
+      ApplicationAttemptStateData attemptStateData)
       throws Exception {
     Credentials credentials = null;
     if(attemptStateData.getAppAttemptTokens() != null){
@@ -137,7 +151,7 @@ public class MemoryRMStateStore extends 
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData)
+      ApplicationAttemptStateData attemptStateData)
       throws Exception {
     Credentials credentials = null;
     if (attemptStateData.getAppAttemptTokens() != null) {
@@ -152,7 +166,8 @@ public class MemoryRMStateStore extends 
           attemptStateData.getStartTime(), attemptStateData.getState(),
           attemptStateData.getFinalTrackingUrl(),
           attemptStateData.getDiagnostics(),
-          attemptStateData.getFinalApplicationStatus());
+          attemptStateData.getFinalApplicationStatus(),
+          attemptStateData.getAMContainerExitStatus());
 
     ApplicationState appState =
         state.getApplicationState().get(
@@ -244,7 +259,7 @@ public class MemoryRMStateStore extends 
   }
 
   @Override
-  protected RMStateVersion loadVersion() throws Exception {
+  protected Version loadVersion() throws Exception {
     return null;
   }
 
@@ -253,8 +268,22 @@ public class MemoryRMStateStore extends 
   }
 
   @Override
-  protected RMStateVersion getCurrentVersion() {
+  protected Version getCurrentVersion() {
     return null;
   }
 
+  @Override
+  public void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+      boolean isUpdate) {
+    if (amrmTokenSecretManagerState != null) {
+      state.amrmTokenSecretManagerState = AMRMTokenSecretManagerState
+          .newInstance(amrmTokenSecretManagerState);
+    }
+  }
+
+  @Override
+  public void deleteStore() throws Exception {
+  }
+
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -25,9 +25,10 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 
 @Unstable
 public class NullRMStateStore extends RMStateStore {
@@ -48,19 +49,24 @@ public class NullRMStateStore extends RM
   }
 
   @Override
+  public synchronized int getAndIncrementEpoch() throws Exception {
+    return 0;
+  }
+
+  @Override
   public RMState loadState() throws Exception {
     throw new UnsupportedOperationException("Cannot load state from null store");
   }
 
   @Override
   protected void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception {
+      ApplicationStateData appStateData) throws Exception {
     // Do nothing
   }
 
   @Override
   protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+      ApplicationAttemptStateData attemptStateData) throws Exception {
     // Do nothing
   }
 
@@ -102,13 +108,13 @@ public class NullRMStateStore extends RM
 
   @Override
   protected void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception {
+      ApplicationStateData appStateData) throws Exception {
     // Do nothing 
   }
 
   @Override
   protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+      ApplicationAttemptStateData attemptStateData) throws Exception {
   }
 
   @Override
@@ -117,7 +123,7 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected RMStateVersion loadVersion() throws Exception {
+  protected Version loadVersion() throws Exception {
     // Do nothing
     return null;
   }
@@ -128,9 +134,20 @@ public class NullRMStateStore extends RM
   }
 
   @Override
-  protected RMStateVersion getCurrentVersion() {
+  protected Version getCurrentVersion() {
     // Do nothing
     return null;
   }
 
+  @Override
+  public void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState state, boolean isUpdate) {
+    //DO Nothing
+  }
+
+  @Override
+  public void deleteStore() throws Exception {
+    // Do nothing
+  }
+
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -31,36 +30,39 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
 
 @Private
 @Unstable
@@ -79,12 +81,174 @@ public abstract class RMStateStore exten
   protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
   protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
       "RMDTSequenceNumber_";
+  protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT =
+      "AMRMTokenSecretManagerRoot";
   protected static final String VERSION_NODE = "RMVersionNode";
+  protected static final String EPOCH_NODE = "EpochNode";
 
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
+  private enum RMStateStoreState {
+    DEFAULT
+  };
+
+  private static final StateMachineFactory<RMStateStore,
+                                           RMStateStoreState,
+                                           RMStateStoreEventType, 
+                                           RMStateStoreEvent>
+      stateMachineFactory = new StateMachineFactory<RMStateStore,
+                                                    RMStateStoreState,
+                                                    RMStateStoreEventType,
+                                                    RMStateStoreEvent>(
+      RMStateStoreState.DEFAULT)
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.STORE_APP, new StoreAppTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());
+
+  private final StateMachine<RMStateStoreState,
+                             RMStateStoreEventType,
+                             RMStateStoreEvent> stateMachine;
+
+  private static class StoreAppTransition
+      implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreAppEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
+      ApplicationId appId = appState.getAppId();
+      ApplicationStateData appStateData = ApplicationStateData
+          .newInstance(appState);
+      LOG.info("Storing info for app: " + appId);
+      try {
+        store.storeApplicationStateInternal(appId, appStateData);
+        store.notifyApplication(new RMAppEvent(appId,
+               RMAppEventType.APP_NEW_SAVED));
+      } catch (Exception e) {
+        LOG.error("Error storing app: " + appId, e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class UpdateAppTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateUpdateAppEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState();
+      ApplicationId appId = appState.getAppId();
+      ApplicationStateData appStateData = ApplicationStateData
+          .newInstance(appState);
+      LOG.info("Updating info for app: " + appId);
+      try {
+        store.updateApplicationStateInternal(appId, appStateData);
+        store.notifyApplication(new RMAppEvent(appId,
+               RMAppEventType.APP_UPDATE_SAVED));
+      } catch (Exception e) {
+        LOG.error("Error updating app: " + appId, e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class RemoveAppTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreRemoveAppEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationState appState = ((RMStateStoreRemoveAppEvent) event)
+          .getAppState();
+      ApplicationId appId = appState.getAppId();
+      LOG.info("Removing info for app: " + appId);
+      try {
+        store.removeApplicationStateInternal(appState);
+      } catch (Exception e) {
+        LOG.error("Error removing app: " + appId, e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class StoreAppAttemptTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreAppAttemptEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationAttemptState attemptState =
+          ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+      try {
+        ApplicationAttemptStateData attemptStateData = 
+            ApplicationAttemptStateData.newInstance(attemptState);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
+        }
+        store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
+            attemptStateData);
+        store.notifyApplicationAttempt(new RMAppAttemptEvent
+               (attemptState.getAttemptId(),
+               RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
+      } catch (Exception e) {
+        LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class UpdateAppAttemptTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationAttemptState attemptState =
+          ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
+      try {
+        ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData
+            .newInstance(attemptState);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
+        }
+        store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
+            attemptStateData);
+        store.notifyApplicationAttempt(new RMAppAttemptEvent
+               (attemptState.getAttemptId(),
+               RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
+      } catch (Exception e) {
+        LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
   public RMStateStore() {
     super(RMStateStore.class.getName());
+    stateMachine = stateMachineFactory.make(this);
   }
 
   /**
@@ -99,19 +263,21 @@ public abstract class RMStateStore exten
     RMAppAttemptState state;
     String finalTrackingUrl = "N/A";
     String diagnostics;
+    int exitStatus = ContainerExitStatus.INVALID;
     FinalApplicationStatus amUnregisteredFinalStatus;
 
     public ApplicationAttemptState(ApplicationAttemptId attemptId,
         Container masterContainer, Credentials appAttemptCredentials,
         long startTime) {
       this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
-        null, "", null);
+        null, "", null, ContainerExitStatus.INVALID);
     }
 
     public ApplicationAttemptState(ApplicationAttemptId attemptId,
         Container masterContainer, Credentials appAttemptCredentials,
         long startTime, RMAppAttemptState state, String finalTrackingUrl,
-        String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus) {
+        String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
+        int exitStatus) {
       this.attemptId = attemptId;
       this.masterContainer = masterContainer;
       this.appAttemptCredentials = appAttemptCredentials;
@@ -120,6 +286,7 @@ public abstract class RMStateStore exten
       this.finalTrackingUrl = finalTrackingUrl;
       this.diagnostics = diagnostics == null ? "" : diagnostics;
       this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
+      this.exitStatus = exitStatus;
     }
 
     public Container getMasterContainer() {
@@ -146,6 +313,9 @@ public abstract class RMStateStore exten
     public FinalApplicationStatus getFinalApplicationStatus() {
       return amUnregisteredFinalStatus;
     }
+    public int getAMContainerExitStatus(){
+      return this.exitStatus;
+    }
   }
   
   /**
@@ -244,6 +414,8 @@ public abstract class RMStateStore exten
 
     RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
 
+    AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
+
     public Map<ApplicationId, ApplicationState> getApplicationState() {
       return appState;
     }
@@ -251,6 +423,10 @@ public abstract class RMStateStore exten
     public RMDTSecretManagerState getRMDTSecretManagerState() {
       return rmSecretManagerState;
     }
+
+    public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() {
+      return amrmTokenSecretManagerState;
+    }
   }
     
   private Dispatcher rmDispatcher;
@@ -319,14 +495,14 @@ public abstract class RMStateStore exten
    *    upgrade RM state.
    */
   public void checkVersion() throws Exception {
-    RMStateVersion loadedVersion = loadVersion();
+    Version loadedVersion = loadVersion();
     LOG.info("Loaded RM state version info " + loadedVersion);
     if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
       return;
     }
     // if there is no version info, treat it as 1.0;
     if (loadedVersion == null) {
-      loadedVersion = RMStateVersion.newInstance(1, 0);
+      loadedVersion = Version.newInstance(1, 0);
     }
     if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
       LOG.info("Storing RM state version info " + getCurrentVersion());
@@ -342,7 +518,7 @@ public abstract class RMStateStore exten
    * Derived class use this method to load the version information from state
    * store.
    */
-  protected abstract RMStateVersion loadVersion() throws Exception;
+  protected abstract Version loadVersion() throws Exception;
 
   /**
    * Derived class use this method to store the version information.
@@ -352,8 +528,14 @@ public abstract class RMStateStore exten
   /**
    * Get the current version of the underlying state store.
    */
-  protected abstract RMStateVersion getCurrentVersion();
+  protected abstract Version getCurrentVersion();
 
+
+  /**
+   * Get the current epoch of RM and increment the value.
+   */
+  public abstract int getAndIncrementEpoch() throws Exception;
+  
   /**
    * Blocking API
    * The derived class must recover state from the store and return a new 
@@ -390,10 +572,10 @@ public abstract class RMStateStore exten
    * application.
    */
   protected abstract void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception;
+      ApplicationStateData appStateData) throws Exception;
 
   protected abstract void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception;
+      ApplicationStateData appStateData) throws Exception;
   
   @SuppressWarnings("unchecked")
   /**
@@ -428,11 +610,11 @@ public abstract class RMStateStore exten
    */
   protected abstract void storeApplicationAttemptStateInternal(
       ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+      ApplicationAttemptStateData attemptStateData) throws Exception;
 
   protected abstract void updateApplicationAttemptStateInternal(
       ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+      ApplicationAttemptStateData attemptStateData) throws Exception;
 
   /**
    * RMDTSecretManager call this to store the state of a delegation token
@@ -540,6 +722,14 @@ public abstract class RMStateStore exten
       throws Exception;
 
   /**
+   * Blocking API Derived classes must implement this method to store or update
+   * the state of AMRMToken Master Key
+   */
+  public abstract void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+      boolean isUpdate);
+
+  /**
    * Non-blocking API
    * ResourceManager services call this to remove an application from the state
    * store
@@ -581,10 +771,7 @@ public abstract class RMStateStore exten
   
   public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
     Credentials credentials = new Credentials();
-    Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
-    if(appToken != null){
-      credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
-    }
+
     SecretKey clientTokenMasterKey =
         appAttempt.getClientTokenMasterKey();
     if(clientTokenMasterKey != null){
@@ -596,105 +783,10 @@ public abstract class RMStateStore exten
 
   // Dispatcher related code
   protected void handleStoreEvent(RMStateStoreEvent event) {
-    if (event.getType().equals(RMStateStoreEventType.STORE_APP)
-        || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
-      ApplicationState appState = null;
-      if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
-        appState = ((RMStateStoreAppEvent) event).getAppState();
-      } else {
-        assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
-        appState = ((RMStateUpdateAppEvent) event).getAppState();
-      }
-
-      Exception storedException = null;
-      ApplicationStateDataPBImpl appStateData =
-          (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
-            .newApplicationStateData(appState.getSubmitTime(),
-              appState.getStartTime(), appState.getUser(),
-              appState.getApplicationSubmissionContext(), appState.getState(),
-              appState.getDiagnostics(), appState.getFinishTime());
-
-      ApplicationId appId =
-          appState.getApplicationSubmissionContext().getApplicationId();
-
-      LOG.info("Storing info for app: " + appId);
-      try {
-        if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
-          storeApplicationStateInternal(appId, appStateData);
-          notifyDoneStoringApplication(appId, storedException);
-        } else {
-          assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
-          updateApplicationStateInternal(appId, appStateData);
-          notifyDoneUpdatingApplication(appId, storedException);
-        }
-      } catch (Exception e) {
-        LOG.error("Error storing/updating app: " + appId, e);
-        notifyStoreOperationFailed(e);
-      }
-    } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
-        || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
-
-      ApplicationAttemptState attemptState = null;
-      if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
-        attemptState =
-            ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
-      } else {
-        assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
-        attemptState =
-            ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
-      }
-
-      Exception storedException = null;
-      Credentials credentials = attemptState.getAppAttemptCredentials();
-      ByteBuffer appAttemptTokens = null;
-      try {
-        if (credentials != null) {
-          DataOutputBuffer dob = new DataOutputBuffer();
-          credentials.writeTokenStorageToStream(dob);
-          appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-        }
-        ApplicationAttemptStateDataPBImpl attemptStateData =
-            (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
-              .newApplicationAttemptStateData(attemptState.getAttemptId(),
-                attemptState.getMasterContainer(), appAttemptTokens,
-                attemptState.getStartTime(), attemptState.getState(),
-                attemptState.getFinalTrackingUrl(),
-                attemptState.getDiagnostics(),
-                attemptState.getFinalApplicationStatus());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
-        }
-        if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
-          storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
-              attemptStateData);
-          notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
-              storedException);
-        } else {
-          assert event.getType().equals(
-            RMStateStoreEventType.UPDATE_APP_ATTEMPT);
-          updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
-              attemptStateData);
-          notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
-              storedException);
-        }
-      } catch (Exception e) {
-        LOG.error(
-            "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e);
-        notifyStoreOperationFailed(e);
-      }
-    } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
-      ApplicationState appState =
-          ((RMStateStoreRemoveAppEvent) event).getAppState();
-      ApplicationId appId = appState.getAppId();
-      LOG.info("Removing info for app: " + appId);
-      try {
-        removeApplicationStateInternal(appState);
-      } catch (Exception e) {
-        LOG.error("Error removing app: " + appId, e);
-        notifyStoreOperationFailed(e);
-      }
-    } else {
-      LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+    try {
+      this.stateMachine.doTransition(event.getType(), event);
+    } catch (InvalidStateTransitonException e) {
+      LOG.error("Can't handle this event at current state", e);
     }
   }
 
@@ -713,47 +805,28 @@ public abstract class RMStateStore exten
     }
     rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
   }
-
+ 
   @SuppressWarnings("unchecked")
   /**
-   * In (@link handleStoreEvent}, this method is called to notify the
-   * application that new application is stored in state store
-   * @param appId id of the application that has been saved
-   * @param storedException the exception that is thrown when storing the
-   * application
-   */
-  private void notifyDoneStoringApplication(ApplicationId appId,
-                                                  Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-        new RMAppNewSavedEvent(appId, storedException));
-  }
-
-  @SuppressWarnings("unchecked")
-  private void notifyDoneUpdatingApplication(ApplicationId appId,
-      Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-      new RMAppUpdateSavedEvent(appId, storedException));
+   * This method is called to notify the application that
+   * new application is stored or updated in state store
+   * @param event App event containing the app id and event type
+   */
+  private void notifyApplication(RMAppEvent event) {
+    rmDispatcher.getEventHandler().handle(event);
   }
-
+  
   @SuppressWarnings("unchecked")
   /**
-   * In (@link handleStoreEvent}, this method is called to notify the
-   * application attempt that new attempt is stored in state store
-   * @param appAttempt attempt that has been saved
-   */
-  private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
-                                                  Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-        new RMAppAttemptNewSavedEvent(attemptId, storedException));
-  }
-
-  @SuppressWarnings("unchecked")
-  private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
-      Exception updatedException) {
-    rmDispatcher.getEventHandler().handle(
-      new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
+   * This method is called to notify the application attempt
+   * that new attempt is stored or updated in state store
+   * @param event App attempt event containing the app attempt
+   * id and event type
+   */
+  private void notifyApplicationAttempt(RMAppAttemptEvent event) {
+    rmDispatcher.getEventHandler().handle(event);
   }
-
+  
   /**
    * EventHandler implementation which forward events to the FSRMStateStore
    * This hides the EventHandle methods of the store from its public interface 
@@ -766,4 +839,10 @@ public abstract class RMStateStore exten
       handleStoreEvent(event);
     }
   }
+
+  /**
+   * Derived classes must implement this method to delete the state store
+   * @throws Exception
+   */
+  public abstract void deleteStore() throws Exception;
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -44,15 +44,23 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -70,6 +78,11 @@ import org.apache.zookeeper.server.auth.
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
+ */
 @Private
 @Unstable
 public class ZKRMStateStore extends RMStateStore {
@@ -78,8 +91,8 @@ public class ZKRMStateStore extends RMSt
   private final SecureRandom random = new SecureRandom();
 
   protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
-  protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
-      .newInstance(1, 0);
+  protected static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(1, 2);
   private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
       "RMDelegationTokensRoot";
   private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -90,7 +103,9 @@ public class ZKRMStateStore extends RMSt
 
   private String zkHostPort = null;
   private int zkSessionTimeout;
-  private long zkRetryInterval;
+
+  @VisibleForTesting
+  long zkRetryInterval;
   private List<ACL> zkAcl;
   private List<ZKUtil.ZKAuthInfo> zkAuths;
 
@@ -98,6 +113,7 @@ public class ZKRMStateStore extends RMSt
    *
    * ROOT_DIR_PATH
    * |--- VERSION_INFO
+   * |--- EPOCH_NODE
    * |--- RM_ZK_FENCING_LOCK
    * |--- RM_APP_ROOT
    * |     |----- (#ApplicationId1)
@@ -118,6 +134,9 @@ public class ZKRMStateStore extends RMSt
    *        |      |----- Key_1
    *        |      |----- Key_2
    *                ....
+   * |--- AMRMTOKEN_SECRET_MANAGER_ROOT
+   *        |----- currentMasterKey
+   *        |----- nextMasterKey
    *
    */
   private String zkRootNodePath;
@@ -126,6 +145,7 @@ public class ZKRMStateStore extends RMSt
   private String dtMasterKeysRootPath;
   private String delegationTokensRootPath;
   private String dtSequenceNumberPath;
+  private String amrmTokenSecretManagerRoot;
 
   @VisibleForTesting
   protected String znodeWorkingPath;
@@ -199,9 +219,14 @@ public class ZKRMStateStore extends RMSt
     zkSessionTimeout =
         conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
             YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
-    zkRetryInterval =
-        conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
-          YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+
+    if (HAUtil.isHAEnabled(conf)) {
+      zkRetryInterval = zkSessionTimeout / numRetries;
+    } else {
+      zkRetryInterval =
+          conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
+              YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+    }
 
     zkAcl = RMZKUtils.getZKAcls(conf);
     zkAuths = RMZKUtils.getZKAuths(conf);
@@ -240,6 +265,8 @@ public class ZKRMStateStore extends RMSt
         RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
     dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
         RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
+    amrmTokenSecretManagerRoot =
+        getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
   }
 
   @Override
@@ -260,24 +287,26 @@ public class ZKRMStateStore extends RMSt
     createRootDir(dtMasterKeysRootPath);
     createRootDir(delegationTokensRootPath);
     createRootDir(dtSequenceNumberPath);
+    createRootDir(amrmTokenSecretManagerRoot);
   }
 
   private void createRootDir(final String rootPath) throws Exception {
     // For root dirs, we shouldn't use the doMulti helper methods
-    try {
-      new ZKAction<String>() {
-        @Override
-        public String run() throws KeeperException, InterruptedException {
+    new ZKAction<String>() {
+      @Override
+      public String run() throws KeeperException, InterruptedException {
+        try {
           return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+        } catch (KeeperException ke) {
+          if (ke.code() == Code.NODEEXISTS) {
+            LOG.debug(rootPath + "znode already exists!");
+            return null;
+          } else {
+            throw ke;
+          }
         }
-      }.runWithRetries();
-    } catch (KeeperException ke) {
-      if (ke.code() == Code.NODEEXISTS) {
-        LOG.debug(rootPath + "znode already exists!");
-      } else {
-        throw ke;
       }
-    }
+    }.runWithRetries();
   }
 
   private void logRootNodeAcls(String prefix) throws Exception {
@@ -353,7 +382,7 @@ public class ZKRMStateStore extends RMSt
   }
 
   @Override
-  protected RMStateVersion getCurrentVersion() {
+  protected Version getCurrentVersion() {
     return CURRENT_VERSION_INFO;
   }
 
@@ -361,7 +390,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void storeVersion() throws Exception {
     String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
     byte[] data =
-        ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+        ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
     if (existsWithRetries(versionNodePath, true) != null) {
       setDataWithRetries(versionNodePath, data, -1);
     } else {
@@ -370,28 +399,68 @@ public class ZKRMStateStore extends RMSt
   }
 
   @Override
-  protected synchronized RMStateVersion loadVersion() throws Exception {
+  protected synchronized Version loadVersion() throws Exception {
     String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
 
     if (existsWithRetries(versionNodePath, true) != null) {
       byte[] data = getDataWithRetries(versionNodePath, true);
-      RMStateVersion version =
-          new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+      Version version =
+          new VersionPBImpl(VersionProto.parseFrom(data));
       return version;
     }
     return null;
   }
 
   @Override
+  public synchronized int getAndIncrementEpoch() throws Exception {
+    String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
+    int currentEpoch = 0;
+    if (existsWithRetries(epochNodePath, true) != null) {
+      // load current epoch
+      byte[] data = getDataWithRetries(epochNodePath, true);
+      Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
+      currentEpoch = epoch.getEpoch();
+      // increment epoch and store it
+      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+          .toByteArray();
+      setDataWithRetries(epochNodePath, storeData, -1);
+    } else {
+      // initialize epoch node with 1 for the next time.
+      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+          .toByteArray();
+      createWithRetries(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
+    }
+    return currentEpoch;
+  }
+
+  @Override
   public synchronized RMState loadState() throws Exception {
     RMState rmState = new RMState();
     // recover DelegationTokenSecretManager
     loadRMDTSecretManagerState(rmState);
     // recover RM applications
     loadRMAppState(rmState);
+    // recover AMRMTokenSecretManager
+    loadAMRMTokenSecretManagerState(rmState);
     return rmState;
   }
 
+  private void loadAMRMTokenSecretManagerState(RMState rmState)
+      throws Exception {
+    byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true);
+    if (data == null) {
+      LOG.warn("There is no data saved");
+      return;
+    }
+    AMRMTokenSecretManagerStatePBImpl stateData =
+        new AMRMTokenSecretManagerStatePBImpl(
+          AMRMTokenSecretManagerStateProto.parseFrom(data));
+    rmState.amrmTokenSecretManagerState =
+        AMRMTokenSecretManagerState.newInstance(
+          stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
+
+  }
+
   private synchronized void loadRMDTSecretManagerState(RMState rmState)
       throws Exception {
     loadRMDelegationKeyState(rmState);
@@ -529,22 +598,22 @@ public class ZKRMStateStore extends RMSt
 
         ApplicationAttemptState attemptState =
             new ApplicationAttemptState(attemptId,
-                attemptStateData.getMasterContainer(), credentials,
-                attemptStateData.getStartTime(),
-                attemptStateData.getState(),
-                attemptStateData.getFinalTrackingUrl(),
-                attemptStateData.getDiagnostics(),
-                attemptStateData.getFinalApplicationStatus());
+              attemptStateData.getMasterContainer(), credentials,
+              attemptStateData.getStartTime(), attemptStateData.getState(),
+              attemptStateData.getFinalTrackingUrl(),
+              attemptStateData.getDiagnostics(),
+              attemptStateData.getFinalApplicationStatus(),
+              attemptStateData.getAMContainerExitStatus());
 
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
       }
     }
-    LOG.info("Done Loading applications from ZK state store");
+    LOG.debug("Done Loading applications from ZK state store");
   }
 
   @Override
   public synchronized void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
@@ -558,7 +627,7 @@ public class ZKRMStateStore extends RMSt
 
   @Override
   public synchronized void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
@@ -572,7 +641,7 @@ public class ZKRMStateStore extends RMSt
     } else {
       createWithRetries(nodeUpdatePath, appStateData, zkAcl,
         CreateMode.PERSISTENT);
-      LOG.info(appId + " znode didn't exist. Created a new znode to"
+      LOG.debug(appId + " znode didn't exist. Created a new znode to"
           + " update the application state.");
     }
   }
@@ -580,7 +649,7 @@ public class ZKRMStateStore extends RMSt
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     String appDirPath = getNodePath(rmAppRoot,
         appAttemptId.getApplicationId().toString());
@@ -598,7 +667,7 @@ public class ZKRMStateStore extends RMSt
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     String appIdStr = appAttemptId.getApplicationId().toString();
     String appAttemptIdStr = appAttemptId.toString();
@@ -615,7 +684,7 @@ public class ZKRMStateStore extends RMSt
     } else {
       createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
         CreateMode.PERSISTENT);
-      LOG.info(appAttemptId + " znode didn't exist. Created a new znode to"
+      LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
           + " update the application attempt state.");
     }
   }
@@ -664,7 +733,7 @@ public class ZKRMStateStore extends RMSt
     if (existsWithRetries(nodeRemovePath, true) != null) {
       opList.add(Op.delete(nodeRemovePath, -1));
     } else {
-      LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+      LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
     }
     doMultiWithRetries(opList);
   }
@@ -681,7 +750,7 @@ public class ZKRMStateStore extends RMSt
       // in case znode doesn't exist
       addStoreOrUpdateOps(
           opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
-      LOG.info("Attempted to update a non-existing znode " + nodeRemovePath);
+      LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
     } else {
       // in case znode exists
       addStoreOrUpdateOps(
@@ -763,7 +832,14 @@ public class ZKRMStateStore extends RMSt
     if (existsWithRetries(nodeRemovePath, true) != null) {
       doMultiWithRetries(Op.delete(nodeRemovePath, -1));
     } else {
-      LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+      LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
+    }
+  }
+
+  @Override
+  public synchronized void deleteStore() throws Exception {
+    if (existsWithRetries(zkRootNodePath, true) != null) {
+      deleteWithRetries(zkRootNodePath, true);
     }
   }
 
@@ -816,7 +892,7 @@ public class ZKRMStateStore extends RMSt
         case Expired:
           // the connection got terminated because of session timeout
           // call listener to reconnect
-          LOG.info("Session expired");
+          LOG.info("ZKRMStateStore Session expired");
           createConnection();
           break;
         default:
@@ -921,6 +997,29 @@ public class ZKRMStateStore extends RMSt
     }.runWithRetries();
   }
 
+  private void deleteWithRetries(
+      final String path, final boolean watch) throws Exception {
+    new ZKAction<Void>() {
+      @Override
+      Void run() throws KeeperException, InterruptedException {
+        recursiveDeleteWithRetriesHelper(path, watch);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  /**
+   * Helper method that deletes znodes recursively
+   */
+  private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
+          throws KeeperException, InterruptedException {
+    List<String> children = zkClient.getChildren(path, watch);
+    for (String child : children) {
+      recursiveDeleteWithRetriesHelper(path + "/" + child, false);
+    }
+    zkClient.delete(path, -1);
+  }
+
   /**
    * Helper class that periodically attempts creating a znode to ensure that
    * this RM continues to be the Active.
@@ -991,13 +1090,13 @@ public class ZKRMStateStore extends RMSt
             throw new StoreFencedException();
           }
         } catch (KeeperException ke) {
+          LOG.info("Exception while executing a ZK operation.", ke);
           if (shouldRetry(ke.code()) && ++retry < numRetries) {
-            LOG.info("Waiting for zookeeper to be connected, retry no. + "
-                + retry);
+            LOG.info("Retrying operation on ZK. Retry no. " + retry);
             Thread.sleep(zkRetryInterval);
             continue;
           }
-          LOG.debug("Error while doing ZK operation.", ke);
+          LOG.info("Maxed out ZK retries. Giving up!");
           throw ke;
         }
       }
@@ -1044,4 +1143,19 @@ public class ZKRMStateStore extends RMSt
     return zk;
   }
 
+  @Override
+  public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+      boolean isUpdate) {
+    AMRMTokenSecretManagerState data =
+        AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
+    byte[] stateData = data.getProto().toByteArray();
+    try {
+      setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1);
+    } catch (Exception ex) {
+      LOG.info("Error storing info for AMRMTokenSecretManager", ex);
+      notifyStoreOperationFailed(ex);
+    }
+  }
+
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java Tue Aug 19 23:49:39 2014
@@ -18,31 +18,74 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.Records;
 
 /*
  * Contains the state data that needs to be persisted for an ApplicationAttempt
  */
 @Public
 @Unstable
-public interface ApplicationAttemptStateData {
-  
+public abstract class ApplicationAttemptStateData {
+  public static ApplicationAttemptStateData newInstance(
+      ApplicationAttemptId attemptId, Container container,
+      ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
+      String finalTrackingUrl, String diagnostics,
+      FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
+    ApplicationAttemptStateData attemptStateData =
+        Records.newRecord(ApplicationAttemptStateData.class);
+    attemptStateData.setAttemptId(attemptId);
+    attemptStateData.setMasterContainer(container);
+    attemptStateData.setAppAttemptTokens(attemptTokens);
+    attemptStateData.setState(finalState);
+    attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
+    attemptStateData.setDiagnostics(diagnostics);
+    attemptStateData.setStartTime(startTime);
+    attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
+    attemptStateData.setAMContainerExitStatus(exitStatus);
+    return attemptStateData;
+  }
+
+  public static ApplicationAttemptStateData newInstance(
+      ApplicationAttemptState attemptState) throws IOException {
+    Credentials credentials = attemptState.getAppAttemptCredentials();
+    ByteBuffer appAttemptTokens = null;
+    if (credentials != null) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+      appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    }
+    return newInstance(attemptState.getAttemptId(),
+      attemptState.getMasterContainer(), appAttemptTokens,
+      attemptState.getStartTime(), attemptState.getState(),
+      attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
+      attemptState.getFinalApplicationStatus(),
+      attemptState.getAMContainerExitStatus());
+  }
+
+  public abstract ApplicationAttemptStateDataProto getProto();
+
   /**
    * The ApplicationAttemptId for the application attempt
    * @return ApplicationAttemptId for the application attempt
    */
   @Public
   @Unstable
-  public ApplicationAttemptId getAttemptId();
+  public abstract ApplicationAttemptId getAttemptId();
   
-  public void setAttemptId(ApplicationAttemptId attemptId);
+  public abstract void setAttemptId(ApplicationAttemptId attemptId);
   
   /*
    * The master container running the application attempt
@@ -50,9 +93,9 @@ public interface ApplicationAttemptState
    */
   @Public
   @Unstable
-  public Container getMasterContainer();
+  public abstract Container getMasterContainer();
   
-  public void setMasterContainer(Container container);
+  public abstract void setMasterContainer(Container container);
 
   /**
    * The application attempt tokens that belong to this attempt
@@ -60,17 +103,17 @@ public interface ApplicationAttemptState
    */
   @Public
   @Unstable
-  public ByteBuffer getAppAttemptTokens();
+  public abstract ByteBuffer getAppAttemptTokens();
 
-  public void setAppAttemptTokens(ByteBuffer attemptTokens);
+  public abstract void setAppAttemptTokens(ByteBuffer attemptTokens);
 
   /**
    * Get the final state of the application attempt.
    * @return the final state of the application attempt.
    */
-  public RMAppAttemptState getState();
+  public abstract RMAppAttemptState getState();
 
-  public void setState(RMAppAttemptState state);
+  public abstract void setState(RMAppAttemptState state);
 
   /**
    * Get the original not-proxied <em>final tracking url</em> for the
@@ -79,34 +122,39 @@ public interface ApplicationAttemptState
    * @return the original not-proxied <em>final tracking url</em> for the
    *         application
    */
-  public String getFinalTrackingUrl();
+  public abstract String getFinalTrackingUrl();
 
   /**
    * Set the final tracking Url of the AM.
    * @param url
    */
-  public void setFinalTrackingUrl(String url);
+  public abstract void setFinalTrackingUrl(String url);
   /**
    * Get the <em>diagnositic information</em> of the attempt 
    * @return <em>diagnositic information</em> of the attempt
    */
-  public String getDiagnostics();
+  public abstract String getDiagnostics();
 
-  public void setDiagnostics(String diagnostics);
+  public abstract void setDiagnostics(String diagnostics);
 
   /**
    * Get the <em>start time</em> of the application.
    * @return <em>start time</em> of the application
    */
-  public long getStartTime();
+  public abstract long getStartTime();
 
-  public void setStartTime(long startTime);
+  public abstract void setStartTime(long startTime);
 
   /**
    * Get the <em>final finish status</em> of the application.
    * @return <em>final finish status</em> of the application
    */
-  public FinalApplicationStatus getFinalApplicationStatus();
+  public abstract FinalApplicationStatus getFinalApplicationStatus();
+
+  public abstract void setFinalApplicationStatus(
+      FinalApplicationStatus finishState);
+
+  public abstract int getAMContainerExitStatus();
 
-  public void setFinalApplicationStatus(FinalApplicationStatus finishState);
+  public abstract void setAMContainerExitStatus(int exitStatus);
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.Records;
 
 /**
  * Contains all the state data that needs to be stored persistently 
@@ -32,19 +35,43 @@ import org.apache.hadoop.yarn.server.res
  */
 @Public
 @Unstable
-public interface ApplicationStateData {
-  
+public abstract class ApplicationStateData {
+  public static ApplicationStateData newInstance(long submitTime,
+      long startTime, String user,
+      ApplicationSubmissionContext submissionContext,
+      RMAppState state, String diagnostics, long finishTime) {
+    ApplicationStateData appState = Records.newRecord(ApplicationStateData.class);
+    appState.setSubmitTime(submitTime);
+    appState.setStartTime(startTime);
+    appState.setUser(user);
+    appState.setApplicationSubmissionContext(submissionContext);
+    appState.setState(state);
+    appState.setDiagnostics(diagnostics);
+    appState.setFinishTime(finishTime);
+    return appState;
+  }
+
+  public static ApplicationStateData newInstance(
+      ApplicationState appState) {
+    return newInstance(appState.getSubmitTime(), appState.getStartTime(),
+        appState.getUser(), appState.getApplicationSubmissionContext(),
+        appState.getState(), appState.getDiagnostics(),
+        appState.getFinishTime());
+  }
+
+  public abstract ApplicationStateDataProto getProto();
+
   /**
    * The time at which the application was received by the Resource Manager
    * @return submitTime
    */
   @Public
   @Unstable
-  public long getSubmitTime();
+  public abstract long getSubmitTime();
   
   @Public
   @Unstable
-  public void setSubmitTime(long submitTime);
+  public abstract void setSubmitTime(long submitTime);
 
   /**
    * Get the <em>start time</em> of the application.
@@ -63,11 +90,11 @@ public interface ApplicationStateData {
    */
   @Public
   @Unstable
-  public void setUser(String user);
+  public abstract void setUser(String user);
   
   @Public
   @Unstable
-  public String getUser();
+  public abstract String getUser();
   
   /**
    * The {@link ApplicationSubmissionContext} for the application
@@ -76,34 +103,34 @@ public interface ApplicationStateData {
    */
   @Public
   @Unstable
-  public ApplicationSubmissionContext getApplicationSubmissionContext();
+  public abstract ApplicationSubmissionContext getApplicationSubmissionContext();
   
   @Public
   @Unstable
-  public void setApplicationSubmissionContext(
+  public abstract void setApplicationSubmissionContext(
       ApplicationSubmissionContext context);
 
   /**
    * Get the final state of the application.
    * @return the final state of the application.
    */
-  public RMAppState getState();
+  public abstract RMAppState getState();
 
-  public void setState(RMAppState state);
+  public abstract void setState(RMAppState state);
 
   /**
    * Get the diagnostics information for the application master.
    * @return the diagnostics information for the application master.
    */
-  public String getDiagnostics();
+  public abstract String getDiagnostics();
 
-  public void setDiagnostics(String diagnostics);
+  public abstract void setDiagnostics(String diagnostics);
 
   /**
    * The finish time of the application.
    * @return the finish time of the application.,
    */
-  public long getFinishTime();
+  public abstract long getFinishTime();
 
-  public void setFinishTime(long finishTime);
+  public abstract void setFinishTime(long finishTime);
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Tue Aug 19 23:49:39 2014
@@ -25,10 +25,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
@@ -36,12 +33,10 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 
-public class ApplicationAttemptStateDataPBImpl
-extends ProtoBase<ApplicationAttemptStateDataProto> 
-implements ApplicationAttemptStateData {
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
+import com.google.protobuf.TextFormat;
 
+public class ApplicationAttemptStateDataPBImpl extends
+    ApplicationAttemptStateData {
   ApplicationAttemptStateDataProto proto = 
       ApplicationAttemptStateDataProto.getDefaultInstance();
   ApplicationAttemptStateDataProto.Builder builder = null;
@@ -60,7 +55,8 @@ implements ApplicationAttemptStateData {
     this.proto = proto;
     viaProto = true;
   }
-  
+
+  @Override
   public ApplicationAttemptStateDataProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
@@ -76,7 +72,8 @@ implements ApplicationAttemptStateData {
       builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
     }
     if(this.appAttemptTokens != null) {
-      builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens));
+      builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat(
+          this.appAttemptTokens));
     }
   }
 
@@ -148,7 +145,8 @@ implements ApplicationAttemptStateData {
     if(!p.hasAppAttemptTokens()) {
       return null;
     }
-    this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens());
+    this.appAttemptTokens = ProtoUtils.convertFromProtoFormat(
+        p.getAppAttemptTokens());
     return appAttemptTokens;
   }
 
@@ -249,24 +247,39 @@ implements ApplicationAttemptStateData {
     builder.setFinalApplicationStatus(convertToProtoFormat(finishState));
   }
 
-  public static ApplicationAttemptStateData newApplicationAttemptStateData(
-      ApplicationAttemptId attemptId, Container container,
-      ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
-      String finalTrackingUrl, String diagnostics,
-      FinalApplicationStatus amUnregisteredFinalStatus) {
-    ApplicationAttemptStateData attemptStateData =
-        recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
-    attemptStateData.setAttemptId(attemptId);
-    attemptStateData.setMasterContainer(container);
-    attemptStateData.setAppAttemptTokens(attemptTokens);
-    attemptStateData.setState(finalState);
-    attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
-    attemptStateData.setDiagnostics(diagnostics);
-    attemptStateData.setStartTime(startTime);
-    attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
-    return attemptStateData;
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
   }
 
+  @Override
+  public int getAMContainerExitStatus() {
+    ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getAmContainerExitStatus();
+  }
+
+  @Override
+  public void setAMContainerExitStatus(int exitStatus) {
+    maybeInitBuilder();
+    builder.setAmContainerExitStatus(exitStatus);
+  }
+
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+  
   private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_";
   public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) {
     return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name());
@@ -281,5 +294,4 @@ implements ApplicationAttemptStateData {
   private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
     return ProtoUtils.convertFromProtoFormat(s);
   }
-
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java Tue Aug 19 23:49:39 2014
@@ -20,21 +20,15 @@ package org.apache.hadoop.yarn.server.re
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 
-public class ApplicationStateDataPBImpl 
-extends ProtoBase<ApplicationStateDataProto> 
-implements ApplicationStateData {
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
+import com.google.protobuf.TextFormat;
 
+public class ApplicationStateDataPBImpl extends ApplicationStateData {
   ApplicationStateDataProto proto = 
             ApplicationStateDataProto.getDefaultInstance();
   ApplicationStateDataProto.Builder builder = null;
@@ -51,7 +45,8 @@ implements ApplicationStateData {
     this.proto = proto;
     viaProto = true;
   }
-  
+
+  @Override
   public ApplicationStateDataProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
@@ -136,7 +131,7 @@ implements ApplicationStateData {
     }
     applicationSubmissionContext = 
         new ApplicationSubmissionContextPBImpl(
-                                          p.getApplicationSubmissionContext());
+            p.getApplicationSubmissionContext());
     return applicationSubmissionContext;
   }
 
@@ -200,21 +195,24 @@ implements ApplicationStateData {
     builder.setFinishTime(finishTime);
   }
 
-  public static ApplicationStateData newApplicationStateData(long submitTime,
-      long startTime, String user,
-      ApplicationSubmissionContext submissionContext, RMAppState state,
-      String diagnostics, long finishTime) {
-
-    ApplicationStateData appState =
-        recordFactory.newRecordInstance(ApplicationStateData.class);
-    appState.setSubmitTime(submitTime);
-    appState.setStartTime(startTime);
-    appState.setUser(user);
-    appState.setApplicationSubmissionContext(submissionContext);
-    appState.setState(state);
-    appState.setDiagnostics(diagnostics);
-    appState.setFinishTime(finishTime);
-    return appState;
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
   }
 
   private static String RM_APP_PREFIX = "RMAPP_";

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Tue Aug 19 23:49:39 2014
@@ -19,16 +19,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import java.util.Collection;
-
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -208,6 +208,14 @@ public interface RMApp extends EventHand
    * @return the flag indicating whether the applications's state is stored.
    */
   boolean isAppFinalStateStored();
+  
+  
+  /**
+   * Nodes on which the containers for this {@link RMApp} ran.
+   * @return the set of nodes that ran any containers from this {@link RMApp}
+   * Add more node on which containers for this {@link RMApp} ran
+   */
+  Set<NodeId> getRanNodes();
 
   /**
    * Create the external user-facing state of ApplicationMaster from the
@@ -215,4 +223,11 @@ public interface RMApp extends EventHand
    * @return the external user-facing state of ApplicationMaster.
    */
   YarnApplicationState createApplicationState();
+  
+  /**
+   * Get RMAppMetrics of the {@link RMApp}.
+   * 
+   * @return metrics
+   */
+  RMAppMetrics getRMAppMetrics();
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Tue Aug 19 23:49:39 2014
@@ -38,6 +38,9 @@ public enum RMAppEventType {
   ATTEMPT_FAILED,
   ATTEMPT_KILLED,
   NODE_UPDATE,
+  
+  // Source: Container and ResourceTracker
+  APP_RUNNING_ON_NODE,
 
   // Source: RMStateStore
   APP_NEW_SAVED,