You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ji...@apache.org on 2014/06/09 21:46:48 UTC

svn commit: r1601492 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ hadoop-yarn/hadoop-yarn-server/ha...

Author: jianhe
Date: Mon Jun  9 19:46:48 2014
New Revision: 1601492

URL: http://svn.apache.org/r1601492
Log:
Merge r1601491 from trunk. YARN-2030. Augmented RMStateStore with state machine. Contributed by Binglin Chang

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Mon Jun  9 19:46:48 2014
@@ -134,6 +134,8 @@ Release 2.5.0 - UNRELEASED
     YARN-2132. ZKRMStateStore.ZKAction#runWithRetries doesn't log the exception
     it encounters. (Vamsee Yarlagadda via kasha)
 
+    YARN-2030. Augmented RMStateStore with state machine. (Binglin Chang via jianhe)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Mon Jun  9 19:46:48 2014
@@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -314,7 +316,7 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String appIdStr = appId.toString();
     Path appDirPath = getAppDir(rmAppRoot, appIdStr);
     fs.mkdirs(appDirPath);
@@ -334,7 +336,7 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String appIdStr = appId.toString();
     Path appDirPath = getAppDir(rmAppRoot, appIdStr);
     Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
@@ -354,7 +356,7 @@ public class FileSystemRMStateStore exte
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     Path appDirPath =
         getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
@@ -375,7 +377,7 @@ public class FileSystemRMStateStore exte
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     Path appDirPath =
         getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Mon Jun  9 19:46:48 2014
@@ -32,9 +32,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -80,7 +80,7 @@ public class MemoryRMStateStore extends 
 
   @Override
   public void storeApplicationStateInternal(ApplicationId appId,
-                                     ApplicationStateDataPBImpl appStateData)
+                                     ApplicationStateData appStateData)
       throws Exception {
     ApplicationState appState =
         new ApplicationState(appStateData.getSubmitTime(),
@@ -92,7 +92,7 @@ public class MemoryRMStateStore extends 
 
   @Override
   public void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception {
+      ApplicationStateData appStateData) throws Exception {
     ApplicationState updatedAppState =
         new ApplicationState(appStateData.getSubmitTime(),
           appStateData.getStartTime(),
@@ -112,7 +112,7 @@ public class MemoryRMStateStore extends 
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData)
+      ApplicationAttemptStateData attemptStateData)
       throws Exception {
     Credentials credentials = null;
     if(attemptStateData.getAppAttemptTokens() != null){
@@ -137,7 +137,7 @@ public class MemoryRMStateStore extends 
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData)
+      ApplicationAttemptStateData attemptStateData)
       throws Exception {
     Credentials credentials = null;
     if (attemptStateData.getAppAttemptTokens() != null) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Mon Jun  9 19:46:48 2014
@@ -25,9 +25,9 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 
 @Unstable
 public class NullRMStateStore extends RMStateStore {
@@ -54,13 +54,13 @@ public class NullRMStateStore extends RM
 
   @Override
   protected void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception {
+      ApplicationStateData appStateData) throws Exception {
     // Do nothing
   }
 
   @Override
   protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+      ApplicationAttemptStateData attemptStateData) throws Exception {
     // Do nothing
   }
 
@@ -102,13 +102,13 @@ public class NullRMStateStore extends RM
 
   @Override
   protected void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception {
+      ApplicationStateData appStateData) throws Exception {
     // Do nothing 
   }
 
   @Override
   protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+      ApplicationAttemptStateData attemptStateData) throws Exception {
   }
 
   @Override

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Mon Jun  9 19:46:48 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -31,7 +30,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -50,6 +48,8 @@ import org.apache.hadoop.yarn.security.A
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -61,6 +61,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
 
 @Private
 @Unstable
@@ -83,8 +87,163 @@ public abstract class RMStateStore exten
 
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
+  private enum RMStateStoreState {
+    DEFAULT
+  };
+
+  private static final StateMachineFactory<RMStateStore,
+                                           RMStateStoreState,
+                                           RMStateStoreEventType, 
+                                           RMStateStoreEvent>
+      stateMachineFactory = new StateMachineFactory<RMStateStore,
+                                                    RMStateStoreState,
+                                                    RMStateStoreEventType,
+                                                    RMStateStoreEvent>(
+      RMStateStoreState.DEFAULT)
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.STORE_APP, new StoreAppTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());
+
+  private final StateMachine<RMStateStoreState,
+                             RMStateStoreEventType,
+                             RMStateStoreEvent> stateMachine;
+
+  private static class StoreAppTransition
+      implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreAppEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
+      ApplicationId appId = appState.getAppId();
+      ApplicationStateData appStateData = ApplicationStateData
+          .newInstance(appState);
+      LOG.info("Storing info for app: " + appId);
+      try {
+        store.storeApplicationStateInternal(appId, appStateData);
+        store.notifyDoneStoringApplication(appId, null);
+      } catch (Exception e) {
+        LOG.error("Error storing app: " + appId, e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class UpdateAppTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateUpdateAppEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState();
+      ApplicationId appId = appState.getAppId();
+      ApplicationStateData appStateData = ApplicationStateData
+          .newInstance(appState);
+      LOG.info("Updating info for app: " + appId);
+      try {
+        store.updateApplicationStateInternal(appId, appStateData);
+        store.notifyDoneUpdatingApplication(appId, null);
+      } catch (Exception e) {
+        LOG.error("Error updating app: " + appId, e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class RemoveAppTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreRemoveAppEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationState appState = ((RMStateStoreRemoveAppEvent) event)
+          .getAppState();
+      ApplicationId appId = appState.getAppId();
+      LOG.info("Removing info for app: " + appId);
+      try {
+        store.removeApplicationStateInternal(appState);
+      } catch (Exception e) {
+        LOG.error("Error removing app: " + appId, e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class StoreAppAttemptTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreAppAttemptEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationAttemptState attemptState =
+          ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+      try {
+        ApplicationAttemptStateData attemptStateData = 
+            ApplicationAttemptStateData.newInstance(attemptState);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
+        }
+        store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
+            attemptStateData);
+        store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
+            null);
+      } catch (Exception e) {
+        LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class UpdateAppAttemptTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationAttemptState attemptState =
+          ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
+      try {
+        ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData
+            .newInstance(attemptState);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
+        }
+        store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
+            attemptStateData);
+        store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
+            null);
+      } catch (Exception e) {
+        LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
   public RMStateStore() {
     super(RMStateStore.class.getName());
+    stateMachine = stateMachineFactory.make(this);
   }
 
   /**
@@ -390,10 +549,10 @@ public abstract class RMStateStore exten
    * application.
    */
   protected abstract void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception;
+      ApplicationStateData appStateData) throws Exception;
 
   protected abstract void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception;
+      ApplicationStateData appStateData) throws Exception;
   
   @SuppressWarnings("unchecked")
   /**
@@ -428,11 +587,11 @@ public abstract class RMStateStore exten
    */
   protected abstract void storeApplicationAttemptStateInternal(
       ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+      ApplicationAttemptStateData attemptStateData) throws Exception;
 
   protected abstract void updateApplicationAttemptStateInternal(
       ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+      ApplicationAttemptStateData attemptStateData) throws Exception;
 
   /**
    * RMDTSecretManager call this to store the state of a delegation token
@@ -596,105 +755,10 @@ public abstract class RMStateStore exten
 
   // Dispatcher related code
   protected void handleStoreEvent(RMStateStoreEvent event) {
-    if (event.getType().equals(RMStateStoreEventType.STORE_APP)
-        || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
-      ApplicationState appState = null;
-      if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
-        appState = ((RMStateStoreAppEvent) event).getAppState();
-      } else {
-        assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
-        appState = ((RMStateUpdateAppEvent) event).getAppState();
-      }
-
-      Exception storedException = null;
-      ApplicationStateDataPBImpl appStateData =
-          (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
-            .newApplicationStateData(appState.getSubmitTime(),
-              appState.getStartTime(), appState.getUser(),
-              appState.getApplicationSubmissionContext(), appState.getState(),
-              appState.getDiagnostics(), appState.getFinishTime());
-
-      ApplicationId appId =
-          appState.getApplicationSubmissionContext().getApplicationId();
-
-      LOG.info("Storing info for app: " + appId);
-      try {
-        if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
-          storeApplicationStateInternal(appId, appStateData);
-          notifyDoneStoringApplication(appId, storedException);
-        } else {
-          assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
-          updateApplicationStateInternal(appId, appStateData);
-          notifyDoneUpdatingApplication(appId, storedException);
-        }
-      } catch (Exception e) {
-        LOG.error("Error storing/updating app: " + appId, e);
-        notifyStoreOperationFailed(e);
-      }
-    } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
-        || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
-
-      ApplicationAttemptState attemptState = null;
-      if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
-        attemptState =
-            ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
-      } else {
-        assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
-        attemptState =
-            ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
-      }
-
-      Exception storedException = null;
-      Credentials credentials = attemptState.getAppAttemptCredentials();
-      ByteBuffer appAttemptTokens = null;
-      try {
-        if (credentials != null) {
-          DataOutputBuffer dob = new DataOutputBuffer();
-          credentials.writeTokenStorageToStream(dob);
-          appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-        }
-        ApplicationAttemptStateDataPBImpl attemptStateData =
-            (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
-              .newApplicationAttemptStateData(attemptState.getAttemptId(),
-                attemptState.getMasterContainer(), appAttemptTokens,
-                attemptState.getStartTime(), attemptState.getState(),
-                attemptState.getFinalTrackingUrl(),
-                attemptState.getDiagnostics(),
-                attemptState.getFinalApplicationStatus());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
-        }
-        if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
-          storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
-              attemptStateData);
-          notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
-              storedException);
-        } else {
-          assert event.getType().equals(
-            RMStateStoreEventType.UPDATE_APP_ATTEMPT);
-          updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
-              attemptStateData);
-          notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
-              storedException);
-        }
-      } catch (Exception e) {
-        LOG.error(
-            "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e);
-        notifyStoreOperationFailed(e);
-      }
-    } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
-      ApplicationState appState =
-          ((RMStateStoreRemoveAppEvent) event).getAppState();
-      ApplicationId appId = appState.getAppId();
-      LOG.info("Removing info for app: " + appId);
-      try {
-        removeApplicationStateInternal(appState);
-      } catch (Exception e) {
-        LOG.error("Error removing app: " + appId, e);
-        notifyStoreOperationFailed(e);
-      }
-    } else {
-      LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+    try {
+      this.stateMachine.doTransition(event.getType(), event);
+    } catch (InvalidStateTransitonException e) {
+      LOG.error("Can't handle this event at current state", e);
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Mon Jun  9 19:46:48 2014
@@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -551,7 +553,7 @@ public class ZKRMStateStore extends RMSt
 
   @Override
   public synchronized void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
@@ -565,7 +567,7 @@ public class ZKRMStateStore extends RMSt
 
   @Override
   public synchronized void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
@@ -587,7 +589,7 @@ public class ZKRMStateStore extends RMSt
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     String appDirPath = getNodePath(rmAppRoot,
         appAttemptId.getApplicationId().toString());
@@ -605,7 +607,7 @@ public class ZKRMStateStore extends RMSt
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     String appIdStr = appAttemptId.getApplicationId().toString();
     String appAttemptIdStr = appAttemptId.toString();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java Mon Jun  9 19:46:48 2014
@@ -18,31 +18,73 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.Records;
 
 /*
  * Contains the state data that needs to be persisted for an ApplicationAttempt
  */
 @Public
 @Unstable
-public interface ApplicationAttemptStateData {
-  
+public abstract class ApplicationAttemptStateData {
+  public static ApplicationAttemptStateData newInstance(
+      ApplicationAttemptId attemptId, Container container,
+      ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
+      String finalTrackingUrl, String diagnostics,
+      FinalApplicationStatus amUnregisteredFinalStatus) {
+    ApplicationAttemptStateData attemptStateData =
+        Records.newRecord(ApplicationAttemptStateData.class);
+    attemptStateData.setAttemptId(attemptId);
+    attemptStateData.setMasterContainer(container);
+    attemptStateData.setAppAttemptTokens(attemptTokens);
+    attemptStateData.setState(finalState);
+    attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
+    attemptStateData.setDiagnostics(diagnostics);
+    attemptStateData.setStartTime(startTime);
+    attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
+    return attemptStateData;
+  }
+
+  public static ApplicationAttemptStateData newInstance(
+      ApplicationAttemptState attemptState) throws IOException {
+    Credentials credentials = attemptState.getAppAttemptCredentials();
+    ByteBuffer appAttemptTokens = null;
+    if (credentials != null) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+      appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    }
+    return newInstance(attemptState.getAttemptId(),
+        attemptState.getMasterContainer(), appAttemptTokens,
+        attemptState.getStartTime(), attemptState.getState(),
+        attemptState.getFinalTrackingUrl(),
+        attemptState.getDiagnostics(),
+        attemptState.getFinalApplicationStatus());
+  }
+
+  public abstract ApplicationAttemptStateDataProto getProto();
+
   /**
    * The ApplicationAttemptId for the application attempt
    * @return ApplicationAttemptId for the application attempt
    */
   @Public
   @Unstable
-  public ApplicationAttemptId getAttemptId();
+  public abstract ApplicationAttemptId getAttemptId();
   
-  public void setAttemptId(ApplicationAttemptId attemptId);
+  public abstract void setAttemptId(ApplicationAttemptId attemptId);
   
   /*
    * The master container running the application attempt
@@ -50,9 +92,9 @@ public interface ApplicationAttemptState
    */
   @Public
   @Unstable
-  public Container getMasterContainer();
+  public abstract Container getMasterContainer();
   
-  public void setMasterContainer(Container container);
+  public abstract void setMasterContainer(Container container);
 
   /**
    * The application attempt tokens that belong to this attempt
@@ -60,17 +102,17 @@ public interface ApplicationAttemptState
    */
   @Public
   @Unstable
-  public ByteBuffer getAppAttemptTokens();
+  public abstract ByteBuffer getAppAttemptTokens();
 
-  public void setAppAttemptTokens(ByteBuffer attemptTokens);
+  public abstract void setAppAttemptTokens(ByteBuffer attemptTokens);
 
   /**
    * Get the final state of the application attempt.
    * @return the final state of the application attempt.
    */
-  public RMAppAttemptState getState();
+  public abstract RMAppAttemptState getState();
 
-  public void setState(RMAppAttemptState state);
+  public abstract void setState(RMAppAttemptState state);
 
   /**
    * Get the original not-proxied <em>final tracking url</em> for the
@@ -79,34 +121,34 @@ public interface ApplicationAttemptState
    * @return the original not-proxied <em>final tracking url</em> for the
    *         application
    */
-  public String getFinalTrackingUrl();
+  public abstract String getFinalTrackingUrl();
 
   /**
    * Set the final tracking Url of the AM.
    * @param url
    */
-  public void setFinalTrackingUrl(String url);
+  public abstract void setFinalTrackingUrl(String url);
   /**
    * Get the <em>diagnositic information</em> of the attempt 
    * @return <em>diagnositic information</em> of the attempt
    */
-  public String getDiagnostics();
+  public abstract String getDiagnostics();
 
-  public void setDiagnostics(String diagnostics);
+  public abstract void setDiagnostics(String diagnostics);
 
   /**
    * Get the <em>start time</em> of the application.
    * @return <em>start time</em> of the application
    */
-  public long getStartTime();
+  public abstract long getStartTime();
 
-  public void setStartTime(long startTime);
+  public abstract void setStartTime(long startTime);
 
   /**
    * Get the <em>final finish status</em> of the application.
    * @return <em>final finish status</em> of the application
    */
-  public FinalApplicationStatus getFinalApplicationStatus();
+  public abstract FinalApplicationStatus getFinalApplicationStatus();
 
-  public void setFinalApplicationStatus(FinalApplicationStatus finishState);
+  public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState);
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java Mon Jun  9 19:46:48 2014
@@ -24,7 +24,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.Records;
 
 /**
  * Contains all the state data that needs to be stored persistently 
@@ -32,19 +35,43 @@ import org.apache.hadoop.yarn.server.res
  */
 @Public
 @Unstable
-public interface ApplicationStateData {
-  
+public abstract class ApplicationStateData {
+  public static ApplicationStateData newInstance(long submitTime,
+      long startTime, String user,
+      ApplicationSubmissionContext submissionContext,
+      RMAppState state, String diagnostics, long finishTime) {
+    ApplicationStateData appState = Records.newRecord(ApplicationStateData.class);
+    appState.setSubmitTime(submitTime);
+    appState.setStartTime(startTime);
+    appState.setUser(user);
+    appState.setApplicationSubmissionContext(submissionContext);
+    appState.setState(state);
+    appState.setDiagnostics(diagnostics);
+    appState.setFinishTime(finishTime);
+    return appState;
+  }
+
+  public static ApplicationStateData newInstance(
+      ApplicationState appState) {
+    return newInstance(appState.getSubmitTime(), appState.getStartTime(),
+        appState.getUser(), appState.getApplicationSubmissionContext(),
+        appState.getState(), appState.getDiagnostics(),
+        appState.getFinishTime());
+  }
+
+  public abstract ApplicationStateDataProto getProto();
+
   /**
    * The time at which the application was received by the Resource Manager
    * @return submitTime
    */
   @Public
   @Unstable
-  public long getSubmitTime();
+  public abstract long getSubmitTime();
   
   @Public
   @Unstable
-  public void setSubmitTime(long submitTime);
+  public abstract void setSubmitTime(long submitTime);
 
   /**
    * Get the <em>start time</em> of the application.
@@ -63,11 +90,11 @@ public interface ApplicationStateData {
    */
   @Public
   @Unstable
-  public void setUser(String user);
+  public abstract void setUser(String user);
   
   @Public
   @Unstable
-  public String getUser();
+  public abstract String getUser();
   
   /**
    * The {@link ApplicationSubmissionContext} for the application
@@ -76,34 +103,34 @@ public interface ApplicationStateData {
    */
   @Public
   @Unstable
-  public ApplicationSubmissionContext getApplicationSubmissionContext();
+  public abstract ApplicationSubmissionContext getApplicationSubmissionContext();
   
   @Public
   @Unstable
-  public void setApplicationSubmissionContext(
+  public abstract void setApplicationSubmissionContext(
       ApplicationSubmissionContext context);
 
   /**
    * Get the final state of the application.
    * @return the final state of the application.
    */
-  public RMAppState getState();
+  public abstract RMAppState getState();
 
-  public void setState(RMAppState state);
+  public abstract void setState(RMAppState state);
 
   /**
    * Get the diagnostics information for the application master.
    * @return the diagnostics information for the application master.
    */
-  public String getDiagnostics();
+  public abstract String getDiagnostics();
 
-  public void setDiagnostics(String diagnostics);
+  public abstract void setDiagnostics(String diagnostics);
 
   /**
    * The finish time of the application.
    * @return the finish time of the application.,
    */
-  public long getFinishTime();
+  public abstract long getFinishTime();
 
-  public void setFinishTime(long finishTime);
+  public abstract void setFinishTime(long finishTime);
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Mon Jun  9 19:46:48 2014
@@ -25,10 +25,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
@@ -36,12 +33,10 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 
-public class ApplicationAttemptStateDataPBImpl
-extends ProtoBase<ApplicationAttemptStateDataProto> 
-implements ApplicationAttemptStateData {
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
+import com.google.protobuf.TextFormat;
 
+public class ApplicationAttemptStateDataPBImpl extends
+    ApplicationAttemptStateData {
   ApplicationAttemptStateDataProto proto = 
       ApplicationAttemptStateDataProto.getDefaultInstance();
   ApplicationAttemptStateDataProto.Builder builder = null;
@@ -60,7 +55,8 @@ implements ApplicationAttemptStateData {
     this.proto = proto;
     viaProto = true;
   }
-  
+
+  @Override
   public ApplicationAttemptStateDataProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
@@ -76,7 +72,8 @@ implements ApplicationAttemptStateData {
       builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
     }
     if(this.appAttemptTokens != null) {
-      builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens));
+      builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat(
+          this.appAttemptTokens));
     }
   }
 
@@ -148,7 +145,8 @@ implements ApplicationAttemptStateData {
     if(!p.hasAppAttemptTokens()) {
       return null;
     }
-    this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens());
+    this.appAttemptTokens = ProtoUtils.convertFromProtoFormat(
+        p.getAppAttemptTokens());
     return appAttemptTokens;
   }
 
@@ -249,24 +247,26 @@ implements ApplicationAttemptStateData {
     builder.setFinalApplicationStatus(convertToProtoFormat(finishState));
   }
 
-  public static ApplicationAttemptStateData newApplicationAttemptStateData(
-      ApplicationAttemptId attemptId, Container container,
-      ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
-      String finalTrackingUrl, String diagnostics,
-      FinalApplicationStatus amUnregisteredFinalStatus) {
-    ApplicationAttemptStateData attemptStateData =
-        recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
-    attemptStateData.setAttemptId(attemptId);
-    attemptStateData.setMasterContainer(container);
-    attemptStateData.setAppAttemptTokens(attemptTokens);
-    attemptStateData.setState(finalState);
-    attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
-    attemptStateData.setDiagnostics(diagnostics);
-    attemptStateData.setStartTime(startTime);
-    attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
-    return attemptStateData;
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
   }
 
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+  
   private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_";
   public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) {
     return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name());

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java Mon Jun  9 19:46:48 2014
@@ -20,21 +20,15 @@ package org.apache.hadoop.yarn.server.re
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 
-public class ApplicationStateDataPBImpl 
-extends ProtoBase<ApplicationStateDataProto> 
-implements ApplicationStateData {
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
+import com.google.protobuf.TextFormat;
 
+public class ApplicationStateDataPBImpl extends ApplicationStateData {
   ApplicationStateDataProto proto = 
             ApplicationStateDataProto.getDefaultInstance();
   ApplicationStateDataProto.Builder builder = null;
@@ -51,7 +45,8 @@ implements ApplicationStateData {
     this.proto = proto;
     viaProto = true;
   }
-  
+
+  @Override
   public ApplicationStateDataProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
@@ -136,7 +131,7 @@ implements ApplicationStateData {
     }
     applicationSubmissionContext = 
         new ApplicationSubmissionContextPBImpl(
-                                          p.getApplicationSubmissionContext());
+            p.getApplicationSubmissionContext());
     return applicationSubmissionContext;
   }
 
@@ -200,21 +195,24 @@ implements ApplicationStateData {
     builder.setFinishTime(finishTime);
   }
 
-  public static ApplicationStateData newApplicationStateData(long submitTime,
-      long startTime, String user,
-      ApplicationSubmissionContext submissionContext, RMAppState state,
-      String diagnostics, long finishTime) {
-
-    ApplicationStateData appState =
-        recordFactory.newRecordInstance(ApplicationStateData.class);
-    appState.setSubmitTime(submitTime);
-    appState.setStartTime(startTime);
-    appState.setUser(user);
-    appState.setApplicationSubmissionContext(submissionContext);
-    appState.setState(state);
-    appState.setDiagnostics(diagnostics);
-    appState.setFinishTime(finishTime);
-    return appState;
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
   }
 
   private static String RM_APP_PREFIX = "RMAPP_";

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Mon Jun  9 19:46:48 2014
@@ -84,8 +84,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -612,7 +612,7 @@ public class TestRMRestart {
 
       @Override
       public void updateApplicationStateInternal(ApplicationId appId,
-          ApplicationStateDataPBImpl appStateData) throws Exception {
+          ApplicationStateData appStateData) throws Exception {
         if (count == 0) {
           // do nothing; simulate app final state is not saved.
           LOG.info(appId + " final state is not saved.");
@@ -760,14 +760,14 @@ public class TestRMRestart {
       @Override
       public synchronized void storeApplicationAttemptStateInternal(
           ApplicationAttemptId attemptId,
-          ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+          ApplicationAttemptStateData attemptStateData) throws Exception {
         // ignore attempt saving request.
       }
 
       @Override
       public synchronized void updateApplicationAttemptStateInternal(
           ApplicationAttemptId attemptId,
-          ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+          ApplicationAttemptStateData attemptStateData) throws Exception {
         // ignore attempt saving request.
       }
     };
@@ -1862,7 +1862,7 @@ public class TestRMRestart {
 
     @Override
     public void updateApplicationStateInternal(ApplicationId appId,
-        ApplicationStateDataPBImpl appStateData) throws Exception {
+        ApplicationStateData appStateData) throws Exception {
       updateApp = ++count;
       super.updateApplicationStateInternal(appId, appStateData);
     }
@@ -1871,7 +1871,7 @@ public class TestRMRestart {
     public synchronized void
         updateApplicationAttemptStateInternal(
             ApplicationAttemptId attemptId,
-            ApplicationAttemptStateDataPBImpl attemptStateData)
+            ApplicationAttemptStateData attemptStateData)
             throws Exception {
       updateAttempt = ++count;
       super.updateApplicationAttemptStateInternal(attemptId,

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1601492&r1=1601491&r2=1601492&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Mon Jun  9 19:46:48 2014
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -37,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
@@ -213,9 +213,8 @@ public class TestFSRMStateStore extends 
           try {
             store.storeApplicationStateInternal(
                 ApplicationId.newInstance(100L, 1),
-                (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
-                    .newApplicationStateData(111, 111, "user", null,
-                        RMAppState.ACCEPTED, "diagnostics", 333));
+                ApplicationStateData.newInstance(111, 111, "user", null,
+                    RMAppState.ACCEPTED, "diagnostics", 333));
           } catch (Exception e) {
             // TODO 0 datanode exception will not be retried by dfs client, fix
             // that separately.