You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cl...@apache.org on 2014/06/16 20:14:12 UTC

svn commit: r1602947 [2/5] - in /hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/ha...

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java Mon Jun 16 18:13:57 2014
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -27,17 +30,18 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -53,10 +58,12 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -185,6 +192,9 @@ public class TestNodeManagerResync {
         TestNodeStatusUpdater.createContainerStatus(2, ContainerState.COMPLETE);
     final Container container =
         TestNodeStatusUpdater.getMockContainer(testCompleteContainer);
+    NMContainerStatus report =
+        createNMContainerStatus(2, ContainerState.COMPLETE);
+    when(container.getNMContainerStatus()).thenReturn(report);
     NodeManager nm = new NodeManager() {
       int registerCount = 0;
 
@@ -203,7 +213,7 @@ public class TestNodeManagerResync {
                 if (registerCount == 0) {
                   // first register, no containers info.
                   try {
-                    Assert.assertEquals(0, request.getContainerStatuses()
+                    Assert.assertEquals(0, request.getNMContainerStatuses()
                       .size());
                   } catch (AssertionError error) {
                     error.printStackTrace();
@@ -214,8 +224,8 @@ public class TestNodeManagerResync {
                     testCompleteContainer.getContainerId(), container);
                 } else {
                   // second register contains the completed container info.
-                  List<ContainerStatus> statuses =
-                      request.getContainerStatuses();
+                  List<NMContainerStatus> statuses =
+                      request.getNMContainerStatuses();
                   try {
                     Assert.assertEquals(1, statuses.size());
                     Assert.assertEquals(testCompleteContainer.getContainerId(),
@@ -510,4 +520,16 @@ public class TestNodeManagerResync {
         }
       }
     }}
+
+  public static NMContainerStatus createNMContainerStatus(int id,
+      ContainerState containerState) {
+    ApplicationId applicationId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId applicationAttemptId =
+        ApplicationAttemptId.newInstance(applicationId, 1);
+    ContainerId containerId = ContainerId.newInstance(applicationAttemptId, id);
+    NMContainerStatus containerReport =
+        NMContainerStatus.newInstance(containerId, containerState,
+          Resource.newInstance(1024, 1), "recover container", 0);
+    return containerReport;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Mon Jun 16 18:13:57 2014
@@ -31,6 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.junit.Assert;
 
 import org.apache.commons.logging.LogFactory;
@@ -68,7 +69,6 @@ import org.apache.hadoop.yarn.security.C
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
@@ -348,8 +348,7 @@ public class TestContainerManager extend
         GetContainerStatusesRequest.newInstance(containerIds);
     ContainerStatus containerStatus = 
         containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
-    int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
-      ExitCode.TERMINATED.getExitCode();
+    int expectedExitCode = ContainerExitStatus.KILLED_BY_APPMASTER;
     Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
 
     // Assert that the process is not alive anymore

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Mon Jun 16 18:13:57 2014
@@ -17,6 +17,7 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -319,7 +320,7 @@ public class TestContainer {
       assertEquals(ContainerState.NEW, wc.c.getContainerState());
       wc.killContainer();
       assertEquals(ContainerState.DONE, wc.c.getContainerState());
-      assertEquals(ExitCode.TERMINATED.getExitCode(),
+      assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
           wc.c.cloneAndGetContainerStatus().getExitStatus());
       assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics()
           .contains("KillRequest"));
@@ -339,7 +340,7 @@ public class TestContainer {
       assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState());
       wc.killContainer();
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
-      assertEquals(ExitCode.TERMINATED.getExitCode(),
+      assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
           wc.c.cloneAndGetContainerStatus().getExitStatus());
       assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics()
           .contains("KillRequest"));
@@ -898,12 +899,14 @@ public class TestContainer {
     }
 
     public void killContainer() {
-      c.handle(new ContainerKillEvent(cId, "KillRequest"));
+      c.handle(new ContainerKillEvent(cId,
+          ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+          "KillRequest"));
       drainDispatcherEvents();
     }
 
     public void containerKilledOnRequest() {
-      int exitCode = ExitCode.FORCE_KILLED.getExitCode();
+      int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER;
       String diagnosticMsg = "Container completed with exit code " + exitCode;
       c.handle(new ContainerExitEvent(cId,
           ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java Mon Jun 16 18:13:57 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
 
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -73,7 +74,6 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -604,8 +604,7 @@ public class TestContainerLaunch extends
         GetContainerStatusesRequest.newInstance(containerIds);
     ContainerStatus containerStatus = 
         containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
-    int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
-      ExitCode.TERMINATED.getExitCode();
+    int expectedExitCode = ContainerExitStatus.KILLED_BY_APPMASTER;
     Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
 
     // Assert that the process is not alive anymore
@@ -717,7 +716,7 @@ public class TestContainerLaunch extends
     ContainerStatus containerStatus = 
         containerManager.getContainerStatuses(gcsRequest)
           .getContainerStatuses().get(0);
-    Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(),
+    Assert.assertEquals(ContainerExitStatus.KILLED_BY_APPMASTER,
         containerStatus.getExitStatus());
 
     // Now verify the contents of the file.  Script generates a message when it

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Mon Jun 16 18:13:57 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -60,7 +61,6 @@ import org.apache.hadoop.yarn.event.Asyn
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
@@ -270,7 +270,7 @@ public class TestContainersMonitor exten
         GetContainerStatusesRequest.newInstance(containerIds);
     ContainerStatus containerStatus =
         containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
-    Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
+    Assert.assertEquals(ContainerExitStatus.KILLED_EXCEEDED_VMEM,
         containerStatus.getExitStatus());
     String expectedMsgPattern =
         "Container \\[pid=" + pid + ",containerID=" + cId

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java Mon Jun 16 18:13:57 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -134,4 +135,9 @@ public class MockContainer implements Co
   public ContainerTokenIdentifier getContainerTokenIdentifier() {
     return this.containerTokenIdentifier;
   }
+
+  @Override
+  public NMContainerStatus getNMContainerStatus() {
+    return null;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Mon Jun 16 18:13:57 2014
@@ -99,4 +99,6 @@ public interface RMContext {
       RMApplicationHistoryWriter rmApplicationHistoryWriter);
 
   ConfigurationProvider getConfigurationProvider();
+
+  boolean isWorkPreservingRecoveryEnabled();
 }
\ No newline at end of file

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Mon Jun 16 18:13:57 2014
@@ -60,6 +60,7 @@ public class RMContextImpl implements RM
     = new ConcurrentHashMap<String, RMNode>();
 
   private boolean isHAEnabled;
+  private boolean isWorkPreservingRecoveryEnabled;
   private HAServiceState haServiceState =
       HAServiceProtocol.HAServiceState.INITIALIZING;
   
@@ -329,6 +330,15 @@ public class RMContextImpl implements RM
     }
   }
 
+  public void setWorkPreservingRecoveryEnabled(boolean enabled) {
+    this.isWorkPreservingRecoveryEnabled = enabled;
+  }
+
+  @Override
+  public boolean isWorkPreservingRecoveryEnabled() {
+    return this.isWorkPreservingRecoveryEnabled;
+  }
+
   @Override
   public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
     return rmApplicationHistoryWriter;

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java Mon Jun 16 18:13:57 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -43,6 +44,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * Utility methods to aid serving RM data through the REST and RPC APIs
@@ -225,4 +228,13 @@ public class RMServerUtils {
     }
   }
 
+  /**
+   * Statically defined dummy ApplicationResourceUsageREport.  Used as
+   * a return value when a valid report cannot be found.
+   */
+  public static final ApplicationResourceUsageReport
+    DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
+      BuilderUtils.newApplicationResourceUsageReport(-1, -1,
+          Resources.createResource(-1, -1), Resources.createResource(-1, -1),
+          Resources.createResource(-1, -1));
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Mon Jun 16 18:13:57 2014
@@ -327,7 +327,7 @@ public class ResourceManager extends Com
    * RMActiveServices handles all the Active services in the RM.
    */
   @Private
-  class RMActiveServices extends CompositeService {
+  public class RMActiveServices extends CompositeService {
 
     private DelegationTokenRenewer delegationTokenRenewer;
     private EventHandler<SchedulerEvent> schedulerDispatcher;
@@ -364,9 +364,15 @@ public class ResourceManager extends Com
           YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
 
       RMStateStore rmStore = null;
-      if(isRecoveryEnabled) {
+      if (isRecoveryEnabled) {
         recoveryEnabled = true;
-        rmStore =  RMStateStoreFactory.getStore(conf);
+        rmStore = RMStateStoreFactory.getStore(conf);
+        boolean isWorkPreservingRecoveryEnabled =
+            conf.getBoolean(
+              YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+              YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
+        rmContext
+          .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
       } else {
         recoveryEnabled = false;
         rmStore = new NullRMStateStore();
@@ -401,6 +407,8 @@ public class ResourceManager extends Com
 
       // Initialize the scheduler
       scheduler = createScheduler();
+      scheduler.setRMContext(rmContext);
+      addIfService(scheduler);
       rmContext.setScheduler(scheduler);
 
       schedulerDispatcher = createSchedulerEventDispatcher();
@@ -429,12 +437,6 @@ public class ResourceManager extends Com
       DefaultMetricsSystem.initialize("ResourceManager");
       JvmMetrics.initSingleton("ResourceManager", null);
 
-      try {
-        scheduler.reinitialize(conf, rmContext);
-      } catch (IOException ioe) {
-        throw new RuntimeException("Failed to initialize scheduler", ioe);
-      }
-
       // creating monitors that handle preemption
       createPolicyMonitors();
 
@@ -524,11 +526,9 @@ public class ResourceManager extends Com
                   (PreemptableResourceScheduler) scheduler));
           for (SchedulingEditPolicy policy : policies) {
             LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
-            policy.init(conf, rmContext.getDispatcher().getEventHandler(),
-                (PreemptableResourceScheduler) scheduler);
             // periodically check whether we need to take action to guarantee
             // constraints
-            SchedulingMonitor mon = new SchedulingMonitor(policy);
+            SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
             addService(mon);
           }
         } else {

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Mon Jun 16 18:13:57 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -195,7 +196,7 @@ public class ResourceTrackerService exte
    */
   @SuppressWarnings("unchecked")
   @VisibleForTesting
-  void handleContainerStatus(ContainerStatus containerStatus) {
+  void handleNMContainerStatus(NMContainerStatus containerStatus) {
     ApplicationAttemptId appAttemptId =
         containerStatus.getContainerId().getApplicationAttemptId();
     RMApp rmApp =
@@ -219,11 +220,14 @@ public class ResourceTrackerService exte
     RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
     Container masterContainer = rmAppAttempt.getMasterContainer();
     if (masterContainer.getId().equals(containerStatus.getContainerId())
-        && containerStatus.getState() == ContainerState.COMPLETE) {
+        && containerStatus.getContainerState() == ContainerState.COMPLETE) {
+      ContainerStatus status =
+          ContainerStatus.newInstance(containerStatus.getContainerId(),
+            containerStatus.getContainerState(), containerStatus.getDiagnostics(),
+            containerStatus.getContainerExitStatus());
       // sending master container finished event.
       RMAppAttemptContainerFinishedEvent evt =
-          new RMAppAttemptContainerFinishedEvent(appAttemptId,
-              containerStatus);
+          new RMAppAttemptContainerFinishedEvent(appAttemptId, status);
       rmContext.getDispatcher().getEventHandler().handle(evt);
     }
   }
@@ -240,11 +244,13 @@ public class ResourceTrackerService exte
     Resource capability = request.getResource();
     String nodeManagerVersion = request.getNMVersion();
 
-    if (!request.getContainerStatuses().isEmpty()) {
-      LOG.info("received container statuses on node manager register :"
-          + request.getContainerStatuses());
-      for (ContainerStatus containerStatus : request.getContainerStatuses()) {
-        handleContainerStatus(containerStatus);
+    if (!rmContext.isWorkPreservingRecoveryEnabled()) {
+      if (!request.getNMContainerStatuses().isEmpty()) {
+        LOG.info("received container statuses on node manager register :"
+            + request.getNMContainerStatuses());
+        for (NMContainerStatus status : request.getNMContainerStatuses()) {
+          handleNMContainerStatus(status);
+        }
       }
     }
     RegisterNodeManagerResponse response = recordFactory
@@ -305,7 +311,7 @@ public class ResourceTrackerService exte
     RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
     if (oldNode == null) {
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+          new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses()));
     } else {
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java Mon Jun 16 18:13:57 2014
@@ -21,6 +21,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -34,18 +36,29 @@ public class SchedulingMonitor extends A
   private Thread checkerThread;
   private volatile boolean stopped;
   private long monitorInterval;
+  private RMContext rmContext;
 
-  public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) {
+  public SchedulingMonitor(RMContext rmContext,
+      SchedulingEditPolicy scheduleEditPolicy) {
     super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
     this.scheduleEditPolicy = scheduleEditPolicy;
-    this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
+    this.rmContext = rmContext;
   }
 
   public long getMonitorInterval() {
     return monitorInterval;
   }
+  
+  @VisibleForTesting
+  public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
+    return scheduleEditPolicy;
+  }
 
+  @SuppressWarnings("unchecked")
   public void serviceInit(Configuration conf) throws Exception {
+    scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
+        (PreemptableResourceScheduler) rmContext.getScheduler());
+    this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
     super.serviceInit(conf);
   }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Mon Jun 16 18:13:57 2014
@@ -165,6 +165,11 @@ public class ProportionalCapacityPreempt
     observeOnly = config.getBoolean(OBSERVE_ONLY, false);
     rc = scheduler.getResourceCalculator();
   }
+  
+  @VisibleForTesting
+  public ResourceCalculator getResourceCalculator() {
+    return rc;
+  }
 
   @Override
   public void editSchedule(){
@@ -203,7 +208,9 @@ public class ProportionalCapacityPreempt
     Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
         getContainersToPreempt(queues, clusterResources);
 
-    logToCSV(queues);
+    if (LOG.isDebugEnabled()) {
+      logToCSV(queues);
+    }
 
     // if we are in observeOnly mode return before any action is taken
     if (observeOnly) {
@@ -603,7 +610,7 @@ public class ProportionalCapacityPreempt
       sb.append(", ");
       tq.appendLogString(sb);
     }
-    LOG.info(sb.toString());
+    LOG.debug(sb.toString());
   }
 
   /**

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Mon Jun 16 18:13:57 2014
@@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -314,7 +316,7 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String appIdStr = appId.toString();
     Path appDirPath = getAppDir(rmAppRoot, appIdStr);
     fs.mkdirs(appDirPath);
@@ -334,7 +336,7 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String appIdStr = appId.toString();
     Path appDirPath = getAppDir(rmAppRoot, appIdStr);
     Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
@@ -354,7 +356,7 @@ public class FileSystemRMStateStore exte
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     Path appDirPath =
         getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
@@ -375,7 +377,7 @@ public class FileSystemRMStateStore exte
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     Path appDirPath =
         getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Mon Jun 16 18:13:57 2014
@@ -32,9 +32,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -80,7 +80,7 @@ public class MemoryRMStateStore extends 
 
   @Override
   public void storeApplicationStateInternal(ApplicationId appId,
-                                     ApplicationStateDataPBImpl appStateData)
+                                     ApplicationStateData appStateData)
       throws Exception {
     ApplicationState appState =
         new ApplicationState(appStateData.getSubmitTime(),
@@ -92,7 +92,7 @@ public class MemoryRMStateStore extends 
 
   @Override
   public void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception {
+      ApplicationStateData appStateData) throws Exception {
     ApplicationState updatedAppState =
         new ApplicationState(appStateData.getSubmitTime(),
           appStateData.getStartTime(),
@@ -112,7 +112,7 @@ public class MemoryRMStateStore extends 
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData)
+      ApplicationAttemptStateData attemptStateData)
       throws Exception {
     Credentials credentials = null;
     if(attemptStateData.getAppAttemptTokens() != null){
@@ -137,7 +137,7 @@ public class MemoryRMStateStore extends 
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData)
+      ApplicationAttemptStateData attemptStateData)
       throws Exception {
     Credentials credentials = null;
     if (attemptStateData.getAppAttemptTokens() != null) {

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Mon Jun 16 18:13:57 2014
@@ -25,9 +25,9 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 
 @Unstable
 public class NullRMStateStore extends RMStateStore {
@@ -54,13 +54,13 @@ public class NullRMStateStore extends RM
 
   @Override
   protected void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception {
+      ApplicationStateData appStateData) throws Exception {
     // Do nothing
   }
 
   @Override
   protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+      ApplicationAttemptStateData attemptStateData) throws Exception {
     // Do nothing
   }
 
@@ -102,13 +102,13 @@ public class NullRMStateStore extends RM
 
   @Override
   protected void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception {
+      ApplicationStateData appStateData) throws Exception {
     // Do nothing 
   }
 
   @Override
   protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+      ApplicationAttemptStateData attemptStateData) throws Exception {
   }
 
   @Override

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Mon Jun 16 18:13:57 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -31,7 +30,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -50,6 +48,8 @@ import org.apache.hadoop.yarn.security.A
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -61,6 +61,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
 
 @Private
 @Unstable
@@ -83,8 +87,163 @@ public abstract class RMStateStore exten
 
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
+  private enum RMStateStoreState {
+    DEFAULT
+  };
+
+  private static final StateMachineFactory<RMStateStore,
+                                           RMStateStoreState,
+                                           RMStateStoreEventType, 
+                                           RMStateStoreEvent>
+      stateMachineFactory = new StateMachineFactory<RMStateStore,
+                                                    RMStateStoreState,
+                                                    RMStateStoreEventType,
+                                                    RMStateStoreEvent>(
+      RMStateStoreState.DEFAULT)
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.STORE_APP, new StoreAppTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
+      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+          RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());
+
+  private final StateMachine<RMStateStoreState,
+                             RMStateStoreEventType,
+                             RMStateStoreEvent> stateMachine;
+
+  private static class StoreAppTransition
+      implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreAppEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
+      ApplicationId appId = appState.getAppId();
+      ApplicationStateData appStateData = ApplicationStateData
+          .newInstance(appState);
+      LOG.info("Storing info for app: " + appId);
+      try {
+        store.storeApplicationStateInternal(appId, appStateData);
+        store.notifyDoneStoringApplication(appId, null);
+      } catch (Exception e) {
+        LOG.error("Error storing app: " + appId, e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class UpdateAppTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateUpdateAppEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState();
+      ApplicationId appId = appState.getAppId();
+      ApplicationStateData appStateData = ApplicationStateData
+          .newInstance(appState);
+      LOG.info("Updating info for app: " + appId);
+      try {
+        store.updateApplicationStateInternal(appId, appStateData);
+        store.notifyDoneUpdatingApplication(appId, null);
+      } catch (Exception e) {
+        LOG.error("Error updating app: " + appId, e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class RemoveAppTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreRemoveAppEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationState appState = ((RMStateStoreRemoveAppEvent) event)
+          .getAppState();
+      ApplicationId appId = appState.getAppId();
+      LOG.info("Removing info for app: " + appId);
+      try {
+        store.removeApplicationStateInternal(appState);
+      } catch (Exception e) {
+        LOG.error("Error removing app: " + appId, e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class StoreAppAttemptTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateStoreAppAttemptEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationAttemptState attemptState =
+          ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+      try {
+        ApplicationAttemptStateData attemptStateData = 
+            ApplicationAttemptStateData.newInstance(attemptState);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
+        }
+        store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
+            attemptStateData);
+        store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
+            null);
+      } catch (Exception e) {
+        LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
+  private static class UpdateAppAttemptTransition implements
+      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+    @Override
+    public void transition(RMStateStore store, RMStateStoreEvent event) {
+      if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
+        // should never happen
+        LOG.error("Illegal event type: " + event.getClass());
+        return;
+      }
+      ApplicationAttemptState attemptState =
+          ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
+      try {
+        ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData
+            .newInstance(attemptState);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
+        }
+        store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
+            attemptStateData);
+        store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
+            null);
+      } catch (Exception e) {
+        LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
+        store.notifyStoreOperationFailed(e);
+      }
+    };
+  }
+
   public RMStateStore() {
     super(RMStateStore.class.getName());
+    stateMachine = stateMachineFactory.make(this);
   }
 
   /**
@@ -390,10 +549,10 @@ public abstract class RMStateStore exten
    * application.
    */
   protected abstract void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception;
+      ApplicationStateData appStateData) throws Exception;
 
   protected abstract void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateData) throws Exception;
+      ApplicationStateData appStateData) throws Exception;
   
   @SuppressWarnings("unchecked")
   /**
@@ -428,11 +587,11 @@ public abstract class RMStateStore exten
    */
   protected abstract void storeApplicationAttemptStateInternal(
       ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+      ApplicationAttemptStateData attemptStateData) throws Exception;
 
   protected abstract void updateApplicationAttemptStateInternal(
       ApplicationAttemptId attemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+      ApplicationAttemptStateData attemptStateData) throws Exception;
 
   /**
    * RMDTSecretManager call this to store the state of a delegation token
@@ -596,105 +755,10 @@ public abstract class RMStateStore exten
 
   // Dispatcher related code
   protected void handleStoreEvent(RMStateStoreEvent event) {
-    if (event.getType().equals(RMStateStoreEventType.STORE_APP)
-        || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
-      ApplicationState appState = null;
-      if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
-        appState = ((RMStateStoreAppEvent) event).getAppState();
-      } else {
-        assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
-        appState = ((RMStateUpdateAppEvent) event).getAppState();
-      }
-
-      Exception storedException = null;
-      ApplicationStateDataPBImpl appStateData =
-          (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
-            .newApplicationStateData(appState.getSubmitTime(),
-              appState.getStartTime(), appState.getUser(),
-              appState.getApplicationSubmissionContext(), appState.getState(),
-              appState.getDiagnostics(), appState.getFinishTime());
-
-      ApplicationId appId =
-          appState.getApplicationSubmissionContext().getApplicationId();
-
-      LOG.info("Storing info for app: " + appId);
-      try {
-        if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
-          storeApplicationStateInternal(appId, appStateData);
-          notifyDoneStoringApplication(appId, storedException);
-        } else {
-          assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
-          updateApplicationStateInternal(appId, appStateData);
-          notifyDoneUpdatingApplication(appId, storedException);
-        }
-      } catch (Exception e) {
-        LOG.error("Error storing/updating app: " + appId, e);
-        notifyStoreOperationFailed(e);
-      }
-    } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
-        || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
-
-      ApplicationAttemptState attemptState = null;
-      if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
-        attemptState =
-            ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
-      } else {
-        assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
-        attemptState =
-            ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
-      }
-
-      Exception storedException = null;
-      Credentials credentials = attemptState.getAppAttemptCredentials();
-      ByteBuffer appAttemptTokens = null;
-      try {
-        if (credentials != null) {
-          DataOutputBuffer dob = new DataOutputBuffer();
-          credentials.writeTokenStorageToStream(dob);
-          appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-        }
-        ApplicationAttemptStateDataPBImpl attemptStateData =
-            (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
-              .newApplicationAttemptStateData(attemptState.getAttemptId(),
-                attemptState.getMasterContainer(), appAttemptTokens,
-                attemptState.getStartTime(), attemptState.getState(),
-                attemptState.getFinalTrackingUrl(),
-                attemptState.getDiagnostics(),
-                attemptState.getFinalApplicationStatus());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
-        }
-        if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
-          storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
-              attemptStateData);
-          notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
-              storedException);
-        } else {
-          assert event.getType().equals(
-            RMStateStoreEventType.UPDATE_APP_ATTEMPT);
-          updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
-              attemptStateData);
-          notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
-              storedException);
-        }
-      } catch (Exception e) {
-        LOG.error(
-            "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e);
-        notifyStoreOperationFailed(e);
-      }
-    } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
-      ApplicationState appState =
-          ((RMStateStoreRemoveAppEvent) event).getAppState();
-      ApplicationId appId = appState.getAppId();
-      LOG.info("Removing info for app: " + appId);
-      try {
-        removeApplicationStateInternal(appState);
-      } catch (Exception e) {
-        LOG.error("Error removing app: " + appId, e);
-        notifyStoreOperationFailed(e);
-      }
-    } else {
-      LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+    try {
+      this.stateMachine.doTransition(event.getType(), event);
+    } catch (InvalidStateTransitonException e) {
+      LOG.error("Can't handle this event at current state", e);
     }
   }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Mon Jun 16 18:13:57 2014
@@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -546,12 +548,12 @@ public class ZKRMStateStore extends RMSt
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
       }
     }
-    LOG.info("Done Loading applications from ZK state store");
+    LOG.debug("Done Loading applications from ZK state store");
   }
 
   @Override
   public synchronized void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
@@ -565,7 +567,7 @@ public class ZKRMStateStore extends RMSt
 
   @Override
   public synchronized void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
@@ -579,7 +581,7 @@ public class ZKRMStateStore extends RMSt
     } else {
       createWithRetries(nodeUpdatePath, appStateData, zkAcl,
         CreateMode.PERSISTENT);
-      LOG.info(appId + " znode didn't exist. Created a new znode to"
+      LOG.debug(appId + " znode didn't exist. Created a new znode to"
           + " update the application state.");
     }
   }
@@ -587,7 +589,7 @@ public class ZKRMStateStore extends RMSt
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     String appDirPath = getNodePath(rmAppRoot,
         appAttemptId.getApplicationId().toString());
@@ -605,7 +607,7 @@ public class ZKRMStateStore extends RMSt
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     String appIdStr = appAttemptId.getApplicationId().toString();
     String appAttemptIdStr = appAttemptId.toString();
@@ -622,7 +624,7 @@ public class ZKRMStateStore extends RMSt
     } else {
       createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
         CreateMode.PERSISTENT);
-      LOG.info(appAttemptId + " znode didn't exist. Created a new znode to"
+      LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
           + " update the application attempt state.");
     }
   }
@@ -671,7 +673,7 @@ public class ZKRMStateStore extends RMSt
     if (existsWithRetries(nodeRemovePath, true) != null) {
       opList.add(Op.delete(nodeRemovePath, -1));
     } else {
-      LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+      LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
     }
     doMultiWithRetries(opList);
   }
@@ -688,7 +690,7 @@ public class ZKRMStateStore extends RMSt
       // in case znode doesn't exist
       addStoreOrUpdateOps(
           opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
-      LOG.info("Attempted to update a non-existing znode " + nodeRemovePath);
+      LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
     } else {
       // in case znode exists
       addStoreOrUpdateOps(
@@ -770,7 +772,7 @@ public class ZKRMStateStore extends RMSt
     if (existsWithRetries(nodeRemovePath, true) != null) {
       doMultiWithRetries(Op.delete(nodeRemovePath, -1));
     } else {
-      LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+      LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
     }
   }
 
@@ -823,7 +825,7 @@ public class ZKRMStateStore extends RMSt
         case Expired:
           // the connection got terminated because of session timeout
           // call listener to reconnect
-          LOG.info("Session expired");
+          LOG.info("ZKRMStateStore Session expired");
           createConnection();
           break;
         default:
@@ -998,13 +1000,13 @@ public class ZKRMStateStore extends RMSt
             throw new StoreFencedException();
           }
         } catch (KeeperException ke) {
+          LOG.info("Exception while executing a ZK operation.", ke);
           if (shouldRetry(ke.code()) && ++retry < numRetries) {
-            LOG.info("Waiting for zookeeper to be connected, retry no. + "
-                + retry);
+            LOG.info("Retrying operation on ZK. Retry no. " + retry);
             Thread.sleep(zkRetryInterval);
             continue;
           }
-          LOG.debug("Error while doing ZK operation.", ke);
+          LOG.info("Maxed out ZK retries. Giving up!");
           throw ke;
         }
       }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java Mon Jun 16 18:13:57 2014
@@ -18,31 +18,73 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.Records;
 
 /*
  * Contains the state data that needs to be persisted for an ApplicationAttempt
  */
 @Public
 @Unstable
-public interface ApplicationAttemptStateData {
-  
+public abstract class ApplicationAttemptStateData {
+  public static ApplicationAttemptStateData newInstance(
+      ApplicationAttemptId attemptId, Container container,
+      ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
+      String finalTrackingUrl, String diagnostics,
+      FinalApplicationStatus amUnregisteredFinalStatus) {
+    ApplicationAttemptStateData attemptStateData =
+        Records.newRecord(ApplicationAttemptStateData.class);
+    attemptStateData.setAttemptId(attemptId);
+    attemptStateData.setMasterContainer(container);
+    attemptStateData.setAppAttemptTokens(attemptTokens);
+    attemptStateData.setState(finalState);
+    attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
+    attemptStateData.setDiagnostics(diagnostics);
+    attemptStateData.setStartTime(startTime);
+    attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
+    return attemptStateData;
+  }
+
+  public static ApplicationAttemptStateData newInstance(
+      ApplicationAttemptState attemptState) throws IOException {
+    Credentials credentials = attemptState.getAppAttemptCredentials();
+    ByteBuffer appAttemptTokens = null;
+    if (credentials != null) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+      appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    }
+    return newInstance(attemptState.getAttemptId(),
+        attemptState.getMasterContainer(), appAttemptTokens,
+        attemptState.getStartTime(), attemptState.getState(),
+        attemptState.getFinalTrackingUrl(),
+        attemptState.getDiagnostics(),
+        attemptState.getFinalApplicationStatus());
+  }
+
+  public abstract ApplicationAttemptStateDataProto getProto();
+
   /**
    * The ApplicationAttemptId for the application attempt
    * @return ApplicationAttemptId for the application attempt
    */
   @Public
   @Unstable
-  public ApplicationAttemptId getAttemptId();
+  public abstract ApplicationAttemptId getAttemptId();
   
-  public void setAttemptId(ApplicationAttemptId attemptId);
+  public abstract void setAttemptId(ApplicationAttemptId attemptId);
   
   /*
    * The master container running the application attempt
@@ -50,9 +92,9 @@ public interface ApplicationAttemptState
    */
   @Public
   @Unstable
-  public Container getMasterContainer();
+  public abstract Container getMasterContainer();
   
-  public void setMasterContainer(Container container);
+  public abstract void setMasterContainer(Container container);
 
   /**
    * The application attempt tokens that belong to this attempt
@@ -60,17 +102,17 @@ public interface ApplicationAttemptState
    */
   @Public
   @Unstable
-  public ByteBuffer getAppAttemptTokens();
+  public abstract ByteBuffer getAppAttemptTokens();
 
-  public void setAppAttemptTokens(ByteBuffer attemptTokens);
+  public abstract void setAppAttemptTokens(ByteBuffer attemptTokens);
 
   /**
    * Get the final state of the application attempt.
    * @return the final state of the application attempt.
    */
-  public RMAppAttemptState getState();
+  public abstract RMAppAttemptState getState();
 
-  public void setState(RMAppAttemptState state);
+  public abstract void setState(RMAppAttemptState state);
 
   /**
    * Get the original not-proxied <em>final tracking url</em> for the
@@ -79,34 +121,34 @@ public interface ApplicationAttemptState
    * @return the original not-proxied <em>final tracking url</em> for the
    *         application
    */
-  public String getFinalTrackingUrl();
+  public abstract String getFinalTrackingUrl();
 
   /**
    * Set the final tracking Url of the AM.
    * @param url
    */
-  public void setFinalTrackingUrl(String url);
+  public abstract void setFinalTrackingUrl(String url);
   /**
    * Get the <em>diagnositic information</em> of the attempt 
    * @return <em>diagnositic information</em> of the attempt
    */
-  public String getDiagnostics();
+  public abstract String getDiagnostics();
 
-  public void setDiagnostics(String diagnostics);
+  public abstract void setDiagnostics(String diagnostics);
 
   /**
    * Get the <em>start time</em> of the application.
    * @return <em>start time</em> of the application
    */
-  public long getStartTime();
+  public abstract long getStartTime();
 
-  public void setStartTime(long startTime);
+  public abstract void setStartTime(long startTime);
 
   /**
    * Get the <em>final finish status</em> of the application.
    * @return <em>final finish status</em> of the application
    */
-  public FinalApplicationStatus getFinalApplicationStatus();
+  public abstract FinalApplicationStatus getFinalApplicationStatus();
 
-  public void setFinalApplicationStatus(FinalApplicationStatus finishState);
+  public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState);
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java Mon Jun 16 18:13:57 2014
@@ -24,7 +24,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.Records;
 
 /**
  * Contains all the state data that needs to be stored persistently 
@@ -32,19 +35,43 @@ import org.apache.hadoop.yarn.server.res
  */
 @Public
 @Unstable
-public interface ApplicationStateData {
-  
+public abstract class ApplicationStateData {
+  public static ApplicationStateData newInstance(long submitTime,
+      long startTime, String user,
+      ApplicationSubmissionContext submissionContext,
+      RMAppState state, String diagnostics, long finishTime) {
+    ApplicationStateData appState = Records.newRecord(ApplicationStateData.class);
+    appState.setSubmitTime(submitTime);
+    appState.setStartTime(startTime);
+    appState.setUser(user);
+    appState.setApplicationSubmissionContext(submissionContext);
+    appState.setState(state);
+    appState.setDiagnostics(diagnostics);
+    appState.setFinishTime(finishTime);
+    return appState;
+  }
+
+  public static ApplicationStateData newInstance(
+      ApplicationState appState) {
+    return newInstance(appState.getSubmitTime(), appState.getStartTime(),
+        appState.getUser(), appState.getApplicationSubmissionContext(),
+        appState.getState(), appState.getDiagnostics(),
+        appState.getFinishTime());
+  }
+
+  public abstract ApplicationStateDataProto getProto();
+
   /**
    * The time at which the application was received by the Resource Manager
    * @return submitTime
    */
   @Public
   @Unstable
-  public long getSubmitTime();
+  public abstract long getSubmitTime();
   
   @Public
   @Unstable
-  public void setSubmitTime(long submitTime);
+  public abstract void setSubmitTime(long submitTime);
 
   /**
    * Get the <em>start time</em> of the application.
@@ -63,11 +90,11 @@ public interface ApplicationStateData {
    */
   @Public
   @Unstable
-  public void setUser(String user);
+  public abstract void setUser(String user);
   
   @Public
   @Unstable
-  public String getUser();
+  public abstract String getUser();
   
   /**
    * The {@link ApplicationSubmissionContext} for the application
@@ -76,34 +103,34 @@ public interface ApplicationStateData {
    */
   @Public
   @Unstable
-  public ApplicationSubmissionContext getApplicationSubmissionContext();
+  public abstract ApplicationSubmissionContext getApplicationSubmissionContext();
   
   @Public
   @Unstable
-  public void setApplicationSubmissionContext(
+  public abstract void setApplicationSubmissionContext(
       ApplicationSubmissionContext context);
 
   /**
    * Get the final state of the application.
    * @return the final state of the application.
    */
-  public RMAppState getState();
+  public abstract RMAppState getState();
 
-  public void setState(RMAppState state);
+  public abstract void setState(RMAppState state);
 
   /**
    * Get the diagnostics information for the application master.
    * @return the diagnostics information for the application master.
    */
-  public String getDiagnostics();
+  public abstract String getDiagnostics();
 
-  public void setDiagnostics(String diagnostics);
+  public abstract void setDiagnostics(String diagnostics);
 
   /**
    * The finish time of the application.
    * @return the finish time of the application.,
    */
-  public long getFinishTime();
+  public abstract long getFinishTime();
 
-  public void setFinishTime(long finishTime);
+  public abstract void setFinishTime(long finishTime);
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Mon Jun 16 18:13:57 2014
@@ -25,10 +25,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
@@ -36,12 +33,10 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 
-public class ApplicationAttemptStateDataPBImpl
-extends ProtoBase<ApplicationAttemptStateDataProto> 
-implements ApplicationAttemptStateData {
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
+import com.google.protobuf.TextFormat;
 
+public class ApplicationAttemptStateDataPBImpl extends
+    ApplicationAttemptStateData {
   ApplicationAttemptStateDataProto proto = 
       ApplicationAttemptStateDataProto.getDefaultInstance();
   ApplicationAttemptStateDataProto.Builder builder = null;
@@ -60,7 +55,8 @@ implements ApplicationAttemptStateData {
     this.proto = proto;
     viaProto = true;
   }
-  
+
+  @Override
   public ApplicationAttemptStateDataProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
@@ -76,7 +72,8 @@ implements ApplicationAttemptStateData {
       builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
     }
     if(this.appAttemptTokens != null) {
-      builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens));
+      builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat(
+          this.appAttemptTokens));
     }
   }
 
@@ -148,7 +145,8 @@ implements ApplicationAttemptStateData {
     if(!p.hasAppAttemptTokens()) {
       return null;
     }
-    this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens());
+    this.appAttemptTokens = ProtoUtils.convertFromProtoFormat(
+        p.getAppAttemptTokens());
     return appAttemptTokens;
   }
 
@@ -249,24 +247,26 @@ implements ApplicationAttemptStateData {
     builder.setFinalApplicationStatus(convertToProtoFormat(finishState));
   }
 
-  public static ApplicationAttemptStateData newApplicationAttemptStateData(
-      ApplicationAttemptId attemptId, Container container,
-      ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
-      String finalTrackingUrl, String diagnostics,
-      FinalApplicationStatus amUnregisteredFinalStatus) {
-    ApplicationAttemptStateData attemptStateData =
-        recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
-    attemptStateData.setAttemptId(attemptId);
-    attemptStateData.setMasterContainer(container);
-    attemptStateData.setAppAttemptTokens(attemptTokens);
-    attemptStateData.setState(finalState);
-    attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
-    attemptStateData.setDiagnostics(diagnostics);
-    attemptStateData.setStartTime(startTime);
-    attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
-    return attemptStateData;
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
   }
 
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+  
   private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_";
   public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) {
     return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name());