You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ac...@apache.org on 2012/12/19 05:21:24 UTC

svn commit: r1423758 [1/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ hadoop-yarn...

Author: acmurthy
Date: Wed Dec 19 04:21:18 2012
New Revision: 1423758

URL: http://svn.apache.org/viewvc?rev=1423758&view=rev
Log:
YARN-230. RM Restart phase 1 - includes support for saving/restarting all applications on an RM bounce. Contributed by Bikas Saha.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
Removed:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileRMStateStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFactory.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Dec 19 04:21:18 2012
@@ -28,6 +28,9 @@ Release 2.0.3-alpha - Unreleased 
 
     YARN-145. Add a Web UI to the fair share scheduler. (Sandy Ryza via tomwhite)
 
+    YARN-230. RM Restart phase 1 - includes support for saving/restarting all
+    applications on an RM bounce. (Bikas Saha via acmurthy)
+
   IMPROVEMENTS
 
     YARN-78. Changed UnManagedAM application to use YarnClient. (Bikas Saha via

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/*
+ * Contains the state data that needs to be persisted for an ApplicationAttempt
+ */
+@Public
+@Unstable
+public interface ApplicationAttemptStateData {
+  
+  /**
+   * The ApplicationAttemptId for the application attempt
+   * @return ApplicationAttemptId for the application attempt
+   */
+  @Public
+  @Unstable
+  public ApplicationAttemptId getAttemptId();
+  
+  public void setAttemptId(ApplicationAttemptId attemptId);
+  
+  /*
+   * The master container running the application attempt
+   * @return Container that hosts the attempt
+   */
+  @Public
+  @Unstable
+  public Container getMasterContainer();
+  
+  public void setMasterContainer(Container container);
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Contains all the state data that needs to be stored persistently 
+ * for an Application
+ */
+@Public
+@Unstable
+public interface ApplicationStateData {
+  
+  /**
+   * The time at which the application was received by the Resource Manager
+   * @return submitTime
+   */
+  @Public
+  @Unstable
+  public long getSubmitTime();
+  
+  @Public
+  @Unstable
+  public void setSubmitTime(long submitTime);
+  
+  /**
+   * The {@link ApplicationSubmissionContext} for the application
+   * {@link ApplicationId} can be obtained from the this
+   * @return ApplicationSubmissionContext
+   */
+  @Public
+  @Unstable
+  public ApplicationSubmissionContext getApplicationSubmissionContext();
+  
+  @Public
+  @Unstable
+  public void setApplicationSubmissionContext(
+                                          ApplicationSubmissionContext context);
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProtoOrBuilder;
+
+public class ApplicationAttemptStateDataPBImpl
+extends ProtoBase<ApplicationAttemptStateDataProto> 
+implements ApplicationAttemptStateData {
+  
+  ApplicationAttemptStateDataProto proto = 
+      ApplicationAttemptStateDataProto.getDefaultInstance();
+  ApplicationAttemptStateDataProto.Builder builder = null;
+  boolean viaProto = false;
+  
+  private ApplicationAttemptId attemptId = null;
+  private Container masterContainer = null;
+  
+  public ApplicationAttemptStateDataPBImpl() {
+    builder = ApplicationAttemptStateDataProto.newBuilder();
+  }
+
+  public ApplicationAttemptStateDataPBImpl(
+      ApplicationAttemptStateDataProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+  
+  public ApplicationAttemptStateDataProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.attemptId != null) {
+      builder.setAttemptId(((ApplicationAttemptIdPBImpl)attemptId).getProto());
+    }
+    if(this.masterContainer != null) {
+      builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
+    }
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) 
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ApplicationAttemptStateDataProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public ApplicationAttemptId getAttemptId() {
+    ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if(attemptId != null) {
+      return attemptId;
+    }
+    if (!p.hasAttemptId()) {
+      return null;
+    }
+    attemptId = new ApplicationAttemptIdPBImpl(p.getAttemptId());
+    return attemptId;
+  }
+
+  @Override
+  public void setAttemptId(ApplicationAttemptId attemptId) {
+    maybeInitBuilder();
+    if (attemptId == null) {
+      builder.clearAttemptId();
+    }
+    this.attemptId = attemptId;
+  }
+
+  @Override
+  public Container getMasterContainer() {
+    ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if(masterContainer != null) {
+      return masterContainer;
+    }
+    if (!p.hasMasterContainer()) {
+      return null;
+    }
+    masterContainer = new ContainerPBImpl(p.getMasterContainer());
+    return masterContainer;
+  }
+
+  @Override
+  public void setMasterContainer(Container container) {
+    maybeInitBuilder();
+    if (container == null) {
+      builder.clearMasterContainer();
+    }
+    this.masterContainer = container;
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.ApplicationStateData;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProtoOrBuilder;
+
+public class ApplicationStateDataPBImpl 
+extends ProtoBase<ApplicationStateDataProto> 
+implements ApplicationStateData {
+  
+  ApplicationStateDataProto proto = 
+            ApplicationStateDataProto.getDefaultInstance();
+  ApplicationStateDataProto.Builder builder = null;
+  boolean viaProto = false;
+  
+  private ApplicationSubmissionContext applicationSubmissionContext = null;
+  
+  public ApplicationStateDataPBImpl() {
+    builder = ApplicationStateDataProto.newBuilder();
+  }
+
+  public ApplicationStateDataPBImpl(
+      ApplicationStateDataProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+  
+  public ApplicationStateDataProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.applicationSubmissionContext != null) {
+      builder.setApplicationSubmissionContext(
+          ((ApplicationSubmissionContextPBImpl)applicationSubmissionContext)
+          .getProto());
+    }
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) 
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ApplicationStateDataProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public long getSubmitTime() {
+    ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasSubmitTime()) {
+      return -1;
+    }
+    return (p.getSubmitTime());
+  }
+
+  @Override
+  public void setSubmitTime(long submitTime) {
+    maybeInitBuilder();
+    builder.setSubmitTime(submitTime);
+  }
+
+  @Override
+  public ApplicationSubmissionContext getApplicationSubmissionContext() {
+    ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if(applicationSubmissionContext != null) {
+      return applicationSubmissionContext;
+    }
+    if (!p.hasApplicationSubmissionContext()) {
+      return null;
+    }
+    applicationSubmissionContext = 
+        new ApplicationSubmissionContextPBImpl(
+                                          p.getApplicationSubmissionContext());
+    return applicationSubmissionContext;
+  }
+
+  @Override
+  public void setApplicationSubmissionContext(
+      ApplicationSubmissionContext context) {
+    maybeInitBuilder();
+    if (context == null) {
+      builder.clearApplicationSubmissionContext();
+    }
+    this.applicationSubmissionContext = context;
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java Wed Dec 19 04:21:18 2012
@@ -210,7 +210,6 @@ implements ApplicationSubmissionContext 
   @Override
   public boolean getUnmanagedAM() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    //There is a default so cancelTokens should never be null
     return p.getUnmanagedAm();
   }
   
@@ -219,7 +218,7 @@ implements ApplicationSubmissionContext 
     maybeInitBuilder();
     builder.setUnmanagedAm(value);
   }
-
+  
   @Override
   public boolean getCancelTokensWhenComplete() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto Wed Dec 19 04:21:18 2012
@@ -329,3 +329,15 @@ message StringBytesMapProto {
   optional bytes value = 2;
 }
 
+////////////////////////////////////////////////////////////////////////
+////// From recovery////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////
+message ApplicationStateDataProto {
+  optional int64 submit_time = 1;
+  optional ApplicationSubmissionContextProto application_submission_context = 2;
+}
+
+message ApplicationAttemptStateDataProto {
+  optional ApplicationAttemptIdProto attemptId = 1;
+  optional ContainerProto master_container = 2;
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java Wed Dec 19 04:21:18 2012
@@ -28,8 +28,6 @@ import org.apache.hadoop.tools.GetGroups
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.service.Service.STATE;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -46,8 +44,7 @@ public class TestGetGroups extends GetGr
   @BeforeClass
   public static void setUpResourceManager() throws IOException, InterruptedException {
     conf = new YarnConfiguration();
-    RMStateStore store = StoreFactory.getStore(conf);
-    resourceManager = new ResourceManager(store) {
+    resourceManager = new ResourceManager() {
       @Override
       protected void doSecureLogin() throws IOException {
       };

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java Wed Dec 19 04:21:18 2012
@@ -34,7 +34,7 @@ public class TestYarnClient {
   @Test
   public void testClientStop() {
     Configuration conf = new Configuration();
-    ResourceManager rm = new ResourceManager(null);
+    ResourceManager rm = new ResourceManager();
     rm.init(conf);
     rm.start();
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Dec 19 04:21:18 2012
@@ -225,10 +225,12 @@ public class YarnConfiguration extends C
   public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 
     7*24*60*60*1000; // 7 days
   
+  public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
+  public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
   
   /** The class to use as the persistent store.*/
   public static final String RM_STORE = RM_PREFIX + "store.class";
-   
+  
   /** The maximum number of completed applications RM keeps. */ 
   public static final String RM_MAX_COMPLETED_APPLICATIONS =
     RM_PREFIX + "max-completed-applications";

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed Dec 19 04:21:18 2012
@@ -205,6 +205,13 @@
   </property>
 
   <property>
+    <description>Enable RM to recover state after starting. If true, then 
+    yarn.resourcemanager.store.class must be specified</description>
+    <name>yarn.resourcemanager.recovery.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
     <description>The class to use as the persistent store.</description>
     <name>yarn.resourcemanager.store.class</name>
   </property>

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Dec 19 04:21:18 2012
@@ -263,6 +263,8 @@ public class ApplicationMasterService ex
     } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
       LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
       // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
+      // Reboot is not useful since after AM reboots, it will send register and 
+      // get an exception. Might as well throw an exception here.
       allocateResponse.setAMResponse(reboot);
       return allocateResponse;
     } 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Dec 19 04:21:18 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
@@ -75,6 +76,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -254,6 +256,20 @@ public class ClientRMService extends Abs
       // So call handle directly and do not send an event.
       rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
           .currentTimeMillis()));
+      
+      // If recovery is enabled then store the application information in a 
+      // blocking call so make sure that RM has stored the information needed 
+      // to restart the AM after RM restart without further client communication
+      RMStateStore stateStore = rmContext.getStateStore();
+      LOG.info("Storing Application with id " + applicationId);
+      try {
+        stateStore.storeApplication(rmContext.getRMApps().get(applicationId));
+      } catch (Exception e) {
+        // For HA this exception needs to be handled by giving up 
+        // master status if we got fenced
+        LOG.error("Failed to store application:" + applicationId, e);
+        ExitUtil.terminate(1, e);
+      }
 
       LOG.info("Application with id " + applicationId.getId() + 
           " submitted by user " + user);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Dec 19 04:21:18 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,10 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -48,7 +53,8 @@ import org.apache.hadoop.yarn.server.sec
 /**
  * This class manages the list of applications for the resource manager. 
  */
-public class RMAppManager implements EventHandler<RMAppManagerEvent> {
+public class RMAppManager implements EventHandler<RMAppManagerEvent>, 
+                                        Recoverable {
 
   private static final Log LOG = LogFactory.getLog(RMAppManager.class);
 
@@ -173,6 +179,10 @@ public class RMAppManager implements Eve
       
       completedApps.add(applicationId);  
       writeAuditLog(applicationId);
+      
+      // application completely done. Remove from state
+      RMStateStore store = rmContext.getStateStore();
+      store.removeApplication(rmContext.getRMApps().get(applicationId));
     }
   }
 
@@ -306,6 +316,37 @@ public class RMAppManager implements Eve
     }
     return credentials;
   }
+  
+  @Override
+  public void recover(RMState state) throws Exception {
+    RMStateStore store = rmContext.getStateStore();
+    assert store != null;
+    // recover applications
+    Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
+    LOG.info("Recovering " + appStates.size() + " applications");
+    for(ApplicationState appState : appStates.values()) {
+      // re-submit the application
+      // this is going to send an app start event but since the async dispatcher 
+      // has not started that event will be queued until we have completed re
+      // populating the state
+      if(appState.getApplicationSubmissionContext().getUnmanagedAM()) {
+        // do not recover unmanaged applications since current recovery 
+        // mechanism of restarting attempts does not work for them.
+        // This will need to be changed in work preserving recovery in which 
+        // RM will re-connect with the running AM's instead of restarting them
+        LOG.info("Not recovering unmanaged application " + appState.getAppId());
+        store.removeApplication(appState);
+      } else {
+        LOG.info("Recovering application " + appState.getAppId());
+        submitApplication(appState.getApplicationSubmissionContext(), 
+                          appState.getSubmitTime());
+        // re-populate attempt information in application
+        RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
+                                                          appState.getAppId());
+        appImpl.recover(state);
+      }
+    }
+  }
 
   @Override
   public void handle(RMAppManagerEvent event) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Wed Dec 19 04:21:18 2012
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMa
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.res
 public interface RMContext {
 
   Dispatcher getDispatcher();
+  
+  RMStateStore getStateStore();
 
   ConcurrentMap<ApplicationId, RMApp> getRMApps();
   

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Wed Dec 19 04:21:18 2012
@@ -23,7 +23,10 @@ import java.util.concurrent.ConcurrentMa
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class RMContextImpl implements RMContext {
 
   private final Dispatcher rmDispatcher;
@@ -48,6 +53,7 @@ public class RMContextImpl implements RM
 
   private AMLivelinessMonitor amLivelinessMonitor;
   private AMLivelinessMonitor amFinishingMonitor;
+  private RMStateStore stateStore = null;
   private ContainerAllocationExpirer containerAllocationExpirer;
   private final DelegationTokenRenewer tokenRenewer;
   private final ApplicationTokenSecretManager appTokenSecretManager;
@@ -55,6 +61,7 @@ public class RMContextImpl implements RM
   private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
 
   public RMContextImpl(Dispatcher rmDispatcher,
+      RMStateStore store,
       ContainerAllocationExpirer containerAllocationExpirer,
       AMLivelinessMonitor amLivelinessMonitor,
       AMLivelinessMonitor amFinishingMonitor,
@@ -63,6 +70,7 @@ public class RMContextImpl implements RM
       RMContainerTokenSecretManager containerTokenSecretManager,
       ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
     this.rmDispatcher = rmDispatcher;
+    this.stateStore = store;
     this.containerAllocationExpirer = containerAllocationExpirer;
     this.amLivelinessMonitor = amLivelinessMonitor;
     this.amFinishingMonitor = amFinishingMonitor;
@@ -71,11 +79,39 @@ public class RMContextImpl implements RM
     this.containerTokenSecretManager = containerTokenSecretManager;
     this.clientToAMTokenSecretManager = clientTokenSecretManager;
   }
+
+  @VisibleForTesting
+  // helper constructor for tests
+  public RMContextImpl(Dispatcher rmDispatcher,
+      ContainerAllocationExpirer containerAllocationExpirer,
+      AMLivelinessMonitor amLivelinessMonitor,
+      AMLivelinessMonitor amFinishingMonitor,
+      DelegationTokenRenewer tokenRenewer,
+      ApplicationTokenSecretManager appTokenSecretManager,
+      RMContainerTokenSecretManager containerTokenSecretManager,
+      ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
+    this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor, 
+          amFinishingMonitor, tokenRenewer, appTokenSecretManager, 
+          containerTokenSecretManager, clientTokenSecretManager);
+    RMStateStore nullStore = new NullRMStateStore();
+    nullStore.setDispatcher(rmDispatcher);
+    try {
+      nullStore.init(new YarnConfiguration());
+      setStateStore(nullStore);
+    } catch (Exception e) {
+      assert false;
+    }
+  }
   
   @Override
   public Dispatcher getDispatcher() {
     return this.rmDispatcher;
   }
+  
+  @Override 
+  public RMStateStore getStateStore() {
+    return stateStore;
+  }
 
   @Override
   public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
@@ -126,4 +162,9 @@ public class RMContextImpl implements RM
   public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
     return this.clientToAMTokenSecretManager;
   }
+  
+  @VisibleForTesting
+  public void setStateStore(RMStateStore store) {
+    stateStore = store;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Dec 19 04:21:18 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
@@ -45,10 +46,11 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -80,6 +82,8 @@ import org.apache.hadoop.yarn.webapp.Web
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The ResourceManager is the main class that is a set of components.
  * "I am the ResourceManager. All your resources are belong to us..."
@@ -119,14 +123,13 @@ public class ResourceManager extends Com
   protected RMDelegationTokenSecretManager rmDTSecretManager;
   private WebApp webApp;
   protected RMContext rmContext;
-  private final RMStateStore store;
   protected ResourceTrackerService resourceTracker;
+  private boolean recoveryEnabled;
 
   private Configuration conf;
-
-  public ResourceManager(RMStateStore store) {
+  
+  public ResourceManager() {
     super("ResourceManager");
-    this.store = store;
   }
   
   public RMContext getRMContext() {
@@ -160,12 +163,34 @@ public class ResourceManager extends Com
 
     this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
     
+    boolean isRecoveryEnabled = conf.getBoolean(
+        YarnConfiguration.RECOVERY_ENABLED,
+        YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
+    
+    RMStateStore rmStore = null;
+    if(isRecoveryEnabled) {
+      recoveryEnabled = true;
+      rmStore =  RMStateStoreFactory.getStore(conf);
+    } else {
+      recoveryEnabled = false;
+      rmStore = new NullRMStateStore();
+    }
+    try {
+      rmStore.init(conf);
+      rmStore.setDispatcher(rmDispatcher);
+    } catch (Exception e) {
+      // the Exception from stateStore.init() needs to be handled for 
+      // HA and we need to give up master status if we got fenced
+      LOG.error("Failed to init state store", e);
+      ExitUtil.terminate(1, e);
+    }
+    
     this.rmContext =
-        new RMContextImpl(this.rmDispatcher,
+        new RMContextImpl(this.rmDispatcher, rmStore,
           this.containerAllocationExpirer, amLivelinessMonitor,
           amFinishingMonitor, tokenRenewer, this.appTokenSecretManager,
           this.containerTokenSecretManager, this.clientToAMSecretManager);
-
+    
     // Register event handler for NodesListManager
     this.nodesListManager = new NodesListManager(this.rmContext);
     this.rmDispatcher.register(NodesListManagerEventType.class, 
@@ -226,9 +251,15 @@ public class ResourceManager extends Com
     addService(applicationMasterLauncher);
 
     new RMNMInfo(this.rmContext, this.scheduler);
-
+    
     super.init(conf);
   }
+  
+  @VisibleForTesting
+  protected void setRMStateStore(RMStateStore rmStore) {
+    rmStore.setDispatcher(rmDispatcher);
+    ((RMContextImpl) rmContext).setStateStore(rmStore);
+  }
 
   protected RMContainerTokenSecretManager createContainerTokenSecretManager(
       Configuration conf) {
@@ -502,6 +533,19 @@ public class ResourceManager extends Com
     this.appTokenSecretManager.start();
     this.containerTokenSecretManager.start();
 
+    if(recoveryEnabled) {
+      try {
+        RMStateStore rmStore = rmContext.getStateStore();
+        RMState state = rmStore.loadState();
+        recover(state);
+      } catch (Exception e) {
+        // the Exception from loadState() needs to be handled for 
+        // HA and we need to give up master status if we got fenced
+        LOG.error("Failed to load/recover state", e);
+        ExitUtil.terminate(1, e);
+      }
+    }
+
     startWepApp();
     DefaultMetricsSystem.initialize("ResourceManager");
     JvmMetrics.initSingleton("ResourceManager", null);
@@ -555,6 +599,13 @@ public class ResourceManager extends Com
 
     DefaultMetricsSystem.shutdown();
 
+    RMStateStore store = rmContext.getStateStore();
+    try {
+      store.close();
+    } catch (Exception e) {
+      LOG.error("Error closing store.", e);
+    }
+      
     super.stop();
   }
   
@@ -643,6 +694,8 @@ public class ResourceManager extends Com
 
   @Override
   public void recover(RMState state) throws Exception {
+    // recover applications
+    rmAppManager.recover(state);
   }
   
   public static void main(String argv[]) {
@@ -650,13 +703,11 @@ public class ResourceManager extends Com
     StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
     try {
       Configuration conf = new YarnConfiguration();
-      RMStateStore store =  StoreFactory.getStore(conf);
-      ResourceManager resourceManager = new ResourceManager(store);
+      ResourceManager resourceManager = new ResourceManager();
       ShutdownHookManager.get().addShutdownHook(
         new CompositeServiceShutdownHook(resourceManager),
         SHUTDOWN_HOOK_PRIORITY);
       resourceManager.init(conf);
-      //resourceManager.recover(store.restore());
       resourceManager.start();
     } catch (Throwable t) {
       LOG.fatal("Error starting ResourceManager", t);

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
+public class MemoryRMStateStore extends RMStateStore {
+  
+  RMState state = new RMState();
+  
+  @VisibleForTesting
+  public RMState getState() {
+    return state;
+  }
+  
+  @Override
+  public synchronized RMState loadState() throws Exception {
+    // return a copy of the state to allow for modification of the real state
+    RMState returnState = new RMState();
+    returnState.appState.putAll(state.appState);
+    return returnState;
+  }
+  
+  @Override
+  public synchronized void initInternal(Configuration conf) {
+  }
+  
+  @Override
+  protected synchronized void closeInternal() throws Exception {
+  }
+
+  @Override
+  public void storeApplicationState(String appId, 
+                                     ApplicationStateDataPBImpl appStateData)
+      throws Exception {
+    ApplicationState appState = new ApplicationState(
+                         appStateData.getSubmitTime(), 
+                         appStateData.getApplicationSubmissionContext());
+    state.appState.put(appState.getAppId(), appState);
+  }
+
+  @Override
+  public synchronized void storeApplicationAttemptState(String attemptIdStr, 
+                            ApplicationAttemptStateDataPBImpl attemptStateData)
+                            throws Exception {
+    ApplicationAttemptId attemptId = ConverterUtils
+                                        .toApplicationAttemptId(attemptIdStr);
+    ApplicationAttemptState attemptState = new ApplicationAttemptState(
+                            attemptId, attemptStateData.getMasterContainer());
+
+    ApplicationState appState = state.getApplicationState().get(
+        attemptState.getAttemptId().getApplicationId());
+    assert appState != null;
+
+    appState.attempts.put(attemptState.getAttemptId(), attemptState);
+  }
+
+  @Override
+  public synchronized void removeApplicationState(ApplicationState appState) 
+                                                            throws Exception {
+    ApplicationId appId = appState.getAppId();
+    ApplicationState removed = state.appState.remove(appId);
+    assert removed != null;
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+
+public class NullRMStateStore extends RMStateStore {
+
+  @Override
+  protected void initInternal(Configuration conf) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  protected void closeInternal() throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public RMState loadState() throws Exception {
+    return null;
+  }
+
+  @Override
+  protected void storeApplicationState(String appId,
+      ApplicationStateDataPBImpl appStateData) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  protected void storeApplicationAttemptState(String attemptId,
+      ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  protected void removeApplicationState(ApplicationState appState)
+      throws Exception {
+    // Do nothing
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Wed Dec 19 04:21:18 2012
@@ -15,10 +15,313 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
+
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
-public interface RMStateStore {
-  public interface RMState {
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+
+@Private
+@Unstable
+/**
+ * Base class to implement storage of ResourceManager state.
+ * Takes care of asynchronous notifications and interfacing with YARN objects.
+ * Real store implementations need to derive from it and implement blocking
+ * store and load methods to actually store and load the state.
+ */
+public abstract class RMStateStore {
+  public static final Log LOG = LogFactory.getLog(RMStateStore.class);
+  
+  /**
+   * State of an application attempt
+   */
+  public static class ApplicationAttemptState {
+    final ApplicationAttemptId attemptId;
+    final Container masterContainer;
+    
+    public ApplicationAttemptState(ApplicationAttemptId attemptId,
+                                   Container masterContainer) {
+      this.attemptId = attemptId;
+      this.masterContainer = masterContainer;
+    }
+    
+    public Container getMasterContainer() {
+      return masterContainer;
+    }
+    public ApplicationAttemptId getAttemptId() {
+      return attemptId;
+    }
+  }
+  
+  /**
+   * State of an application application
+   */
+  public static class ApplicationState {
+    final ApplicationSubmissionContext context;
+    final long submitTime;
+    Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
+                  new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
+    
+    ApplicationState(long submitTime, ApplicationSubmissionContext context) {
+      this.submitTime = submitTime;
+      this.context = context;
+    }
+
+    public ApplicationId getAppId() {
+      return context.getApplicationId();
+    }
+    public long getSubmitTime() {
+      return submitTime;
+    }
+    public int getAttemptCount() {
+      return attempts.size();
+    }
+    public ApplicationSubmissionContext getApplicationSubmissionContext() {
+      return context;
+    }
+    public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) {
+      return attempts.get(attemptId);
+    }
+  }
+  
+  /**
+   * State of the ResourceManager
+   */
+  public static class RMState {
+    Map<ApplicationId, ApplicationState> appState = 
+                                new HashMap<ApplicationId, ApplicationState>();
+    
+    public Map<ApplicationId, ApplicationState> getApplicationState() {
+      return appState;
+    }
+  }
+    
+  private Dispatcher rmDispatcher;
+
+  /**
+   * Dispatcher used to send state operation completion events to 
+   * ResourceManager services
+   */
+  public void setDispatcher(Dispatcher dispatcher) {
+    this.rmDispatcher = dispatcher;
+  }
+  
+  AsyncDispatcher dispatcher;
+  
+  public synchronized void init(Configuration conf) throws Exception{    
+    // create async handler
+    dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.register(RMStateStoreEventType.class, 
+                        new ForwardingEventHandler());
+    dispatcher.start();
+    
+    initInternal(conf);
+  }
+
+  /**
+   * Derived classes initialize themselves using this method.
+   * The base class is initialized and the event dispatcher is ready to use at
+   * this point
+   */
+  protected abstract void initInternal(Configuration conf) throws Exception;
+  
+  public synchronized void close() throws Exception {
+    closeInternal();
+    dispatcher.stop();
+  }
+  
+  /**
+   * Derived classes close themselves using this method.
+   * The base class will be closed and the event dispatcher will be shutdown 
+   * after this
+   */
+  protected abstract void closeInternal() throws Exception;
+  
+  /**
+   * Blocking API
+   * The derived class must recover state from the store and return a new 
+   * RMState object populated with that state
+   * This must not be called on the dispatcher thread
+   */
+  public abstract RMState loadState() throws Exception;
+  
+  /**
+   * Blocking API
+   * ResourceManager services use this to store the application's state
+   * This must not be called on the dispatcher thread
+   */
+  public synchronized void storeApplication(RMApp app) throws Exception {
+    ApplicationSubmissionContext context = app
+                                            .getApplicationSubmissionContext();
+    assert context instanceof ApplicationSubmissionContextPBImpl;
+
+    ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl();
+    appStateData.setSubmitTime(app.getSubmitTime());
+    appStateData.setApplicationSubmissionContext(context);
+
+    LOG.info("Storing info for app: " + context.getApplicationId());
+    storeApplicationState(app.getApplicationId().toString(), appStateData);
+  }
+    
+  /**
+   * Blocking API
+   * Derived classes must implement this method to store the state of an 
+   * application.
+   */
+  protected abstract void storeApplicationState(String appId,
+                                      ApplicationStateDataPBImpl appStateData) 
+                                      throws Exception;
+  
+  @SuppressWarnings("unchecked")
+  /**
+   * Non-blocking API
+   * ResourceManager services call this to store state on an application attempt
+   * This does not block the dispatcher threads
+   * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
+   */
+  public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
+    ApplicationAttemptState attemptState = new ApplicationAttemptState(
+                appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
+    dispatcher.getEventHandler().handle(
+                                new RMStateStoreAppAttemptEvent(attemptState));
+  }
+  
+  /**
+   * Blocking API
+   * Derived classes must implement this method to store the state of an 
+   * application attempt
+   */
+  protected abstract void storeApplicationAttemptState(String attemptId,
+                            ApplicationAttemptStateDataPBImpl attemptStateData) 
+                            throws Exception;
+  
+  
+  /**
+   * Non-blocking API
+   * ResourceManager services call this to remove an application from the state
+   * store
+   * This does not block the dispatcher threads
+   * There is no notification of completion for this operation.
+   */
+  public synchronized void removeApplication(RMApp app) {
+    ApplicationState appState = new ApplicationState(
+        app.getSubmitTime(), app.getApplicationSubmissionContext());
+    for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
+      ApplicationAttemptState attemptState = new ApplicationAttemptState(
+                appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
+      appState.attempts.put(attemptState.getAttemptId(), attemptState);
+    }
+    
+    removeApplication(appState);
+  }
+  
+  @SuppressWarnings("unchecked")
+  /**
+   * Non-Blocking API
+   */
+  public synchronized void removeApplication(ApplicationState appState) {
+    dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
+  }
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to remove the state of an 
+   * application and its attempts
+   */
+  protected abstract void removeApplicationState(ApplicationState appState) 
+                                                             throws Exception;
+  
+  // Dispatcher related code
+  
+  private synchronized void handleStoreEvent(RMStateStoreEvent event) {
+    switch(event.getType()) {
+      case STORE_APP_ATTEMPT:
+        {
+          ApplicationAttemptState attemptState = 
+                    ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+          Exception storedException = null;
+          ApplicationAttemptStateDataPBImpl attemptStateData = 
+                                        new ApplicationAttemptStateDataPBImpl();
+          attemptStateData.setAttemptId(attemptState.getAttemptId());
+          attemptStateData.setMasterContainer(attemptState.getMasterContainer());
+
+          LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
+          try {
+            storeApplicationAttemptState(attemptState.getAttemptId().toString(), 
+                                         attemptStateData);
+          } catch (Exception e) {
+            LOG.error("Error storing appAttempt: " 
+                      + attemptState.getAttemptId(), e);
+            storedException = e;
+          } finally {
+            notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), 
+                                                storedException);            
+          }
+        }
+        break;
+      case REMOVE_APP:
+        {
+          ApplicationState appState = 
+                          ((RMStateStoreRemoveAppEvent) event).getAppState();
+          ApplicationId appId = appState.getAppId();
+          
+          LOG.info("Removing info for app: " + appId);
+          try {
+            removeApplicationState(appState);
+          } catch (Exception e) {
+            LOG.error("Error removing app: " + appId, e);
+          }
+        }
+        break;
+      default:
+        LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  /**
+   * In (@link storeApplicationAttempt}, derived class can call this method to
+   * notify the application attempt about operation completion 
+   * @param appAttempt attempt that has been saved
+   */
+  private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
+                                                  Exception storedException) {
+    rmDispatcher.getEventHandler().handle(
+        new RMAppAttemptStoredEvent(attemptId, storedException));
+  }
+  
+  /**
+   * EventHandler implementation which forward events to the FSRMStateStore
+   * This hides the EventHandle methods of the store from its public interface 
+   */
+  private final class ForwardingEventHandler 
+                                  implements EventHandler<RMStateStoreEvent> {
+    
+    @Override
+    public void handle(RMStateStoreEvent event) {
+      handleStoreEvent(event);
+    }
     
   }
+
 }
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+
+public class RMStateStoreAppAttemptEvent extends RMStateStoreEvent {
+  ApplicationAttemptState attemptState;
+  
+  public RMStateStoreAppAttemptEvent(ApplicationAttemptState attemptState) {
+    super(RMStateStoreEventType.STORE_APP_ATTEMPT);
+    this.attemptState = attemptState;
+  }
+  
+  public ApplicationAttemptState getAppAttemptState() {
+    return attemptState;
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMStateStoreEvent extends AbstractEvent<RMStateStoreEventType> {
+  public RMStateStoreEvent(RMStateStoreEventType type) {
+    super(type);
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+public enum RMStateStoreEventType {
+  STORE_APP_ATTEMPT,
+  REMOVE_APP
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+public class RMStateStoreFactory {
+  
+  public static RMStateStore getStore(Configuration conf) {
+    RMStateStore store = ReflectionUtils.newInstance(
+        conf.getClass(YarnConfiguration.RM_STORE, 
+            MemoryRMStateStore.class, RMStateStore.class), 
+            conf);
+    return store;
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+
+public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent {
+  ApplicationState appState;
+  
+  RMStateStoreRemoveAppEvent(ApplicationState appState) {
+    super(RMStateStoreEventType.REMOVE_APP);
+    this.appState = appState;
+  }
+  
+  public ApplicationState getAppState() {
+    return appState;
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java Wed Dec 19 04:21:18 2012
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;