You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ac...@apache.org on 2012/12/19 05:21:24 UTC
svn commit: r1423758 [1/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/
hadoop-yarn...
Author: acmurthy
Date: Wed Dec 19 04:21:18 2012
New Revision: 1423758
URL: http://svn.apache.org/viewvc?rev=1423758&view=rev
Log:
YARN-230. RM Restart phase 1 - includes support for saving/restarting all applications on an RM bounce. Contributed by Bikas Saha.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
Removed:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFactory.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Dec 19 04:21:18 2012
@@ -28,6 +28,9 @@ Release 2.0.3-alpha - Unreleased
YARN-145. Add a Web UI to the fair share scheduler. (Sandy Ryza via tomwhite)
+ YARN-230. RM Restart phase 1 - includes support for saving/restarting all
+ applications on an RM bounce. (Bikas Saha via acmurthy)
+
IMPROVEMENTS
YARN-78. Changed UnManagedAM application to use YarnClient. (Bikas Saha via
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/*
+ * Contains the state data that needs to be persisted for an ApplicationAttempt
+ */
+@Public
+@Unstable
+public interface ApplicationAttemptStateData {
+
+ /**
+ * The ApplicationAttemptId for the application attempt
+ * @return ApplicationAttemptId for the application attempt
+ */
+ @Public
+ @Unstable
+ public ApplicationAttemptId getAttemptId();
+
+ public void setAttemptId(ApplicationAttemptId attemptId);
+
+ /*
+ * The master container running the application attempt
+ * @return Container that hosts the attempt
+ */
+ @Public
+ @Unstable
+ public Container getMasterContainer();
+
+ public void setMasterContainer(Container container);
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Contains all the state data that needs to be stored persistently
+ * for an Application
+ */
+@Public
+@Unstable
+public interface ApplicationStateData {
+
+ /**
+ * The time at which the application was received by the Resource Manager
+ * @return submitTime
+ */
+ @Public
+ @Unstable
+ public long getSubmitTime();
+
+ @Public
+ @Unstable
+ public void setSubmitTime(long submitTime);
+
+ /**
+ * The {@link ApplicationSubmissionContext} for the application
+ * {@link ApplicationId} can be obtained from the this
+ * @return ApplicationSubmissionContext
+ */
+ @Public
+ @Unstable
+ public ApplicationSubmissionContext getApplicationSubmissionContext();
+
+ @Public
+ @Unstable
+ public void setApplicationSubmissionContext(
+ ApplicationSubmissionContext context);
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProtoOrBuilder;
+
+public class ApplicationAttemptStateDataPBImpl
+extends ProtoBase<ApplicationAttemptStateDataProto>
+implements ApplicationAttemptStateData {
+
+ ApplicationAttemptStateDataProto proto =
+ ApplicationAttemptStateDataProto.getDefaultInstance();
+ ApplicationAttemptStateDataProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ApplicationAttemptId attemptId = null;
+ private Container masterContainer = null;
+
+ public ApplicationAttemptStateDataPBImpl() {
+ builder = ApplicationAttemptStateDataProto.newBuilder();
+ }
+
+ public ApplicationAttemptStateDataPBImpl(
+ ApplicationAttemptStateDataProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ApplicationAttemptStateDataProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.attemptId != null) {
+ builder.setAttemptId(((ApplicationAttemptIdPBImpl)attemptId).getProto());
+ }
+ if(this.masterContainer != null) {
+ builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ApplicationAttemptStateDataProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public ApplicationAttemptId getAttemptId() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ if(attemptId != null) {
+ return attemptId;
+ }
+ if (!p.hasAttemptId()) {
+ return null;
+ }
+ attemptId = new ApplicationAttemptIdPBImpl(p.getAttemptId());
+ return attemptId;
+ }
+
+ @Override
+ public void setAttemptId(ApplicationAttemptId attemptId) {
+ maybeInitBuilder();
+ if (attemptId == null) {
+ builder.clearAttemptId();
+ }
+ this.attemptId = attemptId;
+ }
+
+ @Override
+ public Container getMasterContainer() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ if(masterContainer != null) {
+ return masterContainer;
+ }
+ if (!p.hasMasterContainer()) {
+ return null;
+ }
+ masterContainer = new ContainerPBImpl(p.getMasterContainer());
+ return masterContainer;
+ }
+
+ @Override
+ public void setMasterContainer(Container container) {
+ maybeInitBuilder();
+ if (container == null) {
+ builder.clearMasterContainer();
+ }
+ this.masterContainer = container;
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.ApplicationStateData;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProtoOrBuilder;
+
+public class ApplicationStateDataPBImpl
+extends ProtoBase<ApplicationStateDataProto>
+implements ApplicationStateData {
+
+ ApplicationStateDataProto proto =
+ ApplicationStateDataProto.getDefaultInstance();
+ ApplicationStateDataProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ApplicationSubmissionContext applicationSubmissionContext = null;
+
+ public ApplicationStateDataPBImpl() {
+ builder = ApplicationStateDataProto.newBuilder();
+ }
+
+ public ApplicationStateDataPBImpl(
+ ApplicationStateDataProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ApplicationStateDataProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.applicationSubmissionContext != null) {
+ builder.setApplicationSubmissionContext(
+ ((ApplicationSubmissionContextPBImpl)applicationSubmissionContext)
+ .getProto());
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ApplicationStateDataProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public long getSubmitTime() {
+ ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasSubmitTime()) {
+ return -1;
+ }
+ return (p.getSubmitTime());
+ }
+
+ @Override
+ public void setSubmitTime(long submitTime) {
+ maybeInitBuilder();
+ builder.setSubmitTime(submitTime);
+ }
+
+ @Override
+ public ApplicationSubmissionContext getApplicationSubmissionContext() {
+ ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ if(applicationSubmissionContext != null) {
+ return applicationSubmissionContext;
+ }
+ if (!p.hasApplicationSubmissionContext()) {
+ return null;
+ }
+ applicationSubmissionContext =
+ new ApplicationSubmissionContextPBImpl(
+ p.getApplicationSubmissionContext());
+ return applicationSubmissionContext;
+ }
+
+ @Override
+ public void setApplicationSubmissionContext(
+ ApplicationSubmissionContext context) {
+ maybeInitBuilder();
+ if (context == null) {
+ builder.clearApplicationSubmissionContext();
+ }
+ this.applicationSubmissionContext = context;
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java Wed Dec 19 04:21:18 2012
@@ -210,7 +210,6 @@ implements ApplicationSubmissionContext
@Override
public boolean getUnmanagedAM() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
- //There is a default so cancelTokens should never be null
return p.getUnmanagedAm();
}
@@ -219,7 +218,7 @@ implements ApplicationSubmissionContext
maybeInitBuilder();
builder.setUnmanagedAm(value);
}
-
+
@Override
public boolean getCancelTokensWhenComplete() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto Wed Dec 19 04:21:18 2012
@@ -329,3 +329,15 @@ message StringBytesMapProto {
optional bytes value = 2;
}
+////////////////////////////////////////////////////////////////////////
+////// From recovery////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////
+message ApplicationStateDataProto {
+ optional int64 submit_time = 1;
+ optional ApplicationSubmissionContextProto application_submission_context = 2;
+}
+
+message ApplicationAttemptStateDataProto {
+ optional ApplicationAttemptIdProto attemptId = 1;
+ optional ContainerProto master_container = 2;
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java Wed Dec 19 04:21:18 2012
@@ -28,8 +28,6 @@ import org.apache.hadoop.tools.GetGroups
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.AfterClass;
import org.junit.Before;
@@ -46,8 +44,7 @@ public class TestGetGroups extends GetGr
@BeforeClass
public static void setUpResourceManager() throws IOException, InterruptedException {
conf = new YarnConfiguration();
- RMStateStore store = StoreFactory.getStore(conf);
- resourceManager = new ResourceManager(store) {
+ resourceManager = new ResourceManager() {
@Override
protected void doSecureLogin() throws IOException {
};
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java Wed Dec 19 04:21:18 2012
@@ -34,7 +34,7 @@ public class TestYarnClient {
@Test
public void testClientStop() {
Configuration conf = new Configuration();
- ResourceManager rm = new ResourceManager(null);
+ ResourceManager rm = new ResourceManager();
rm.init(conf);
rm.start();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Dec 19 04:21:18 2012
@@ -225,10 +225,12 @@ public class YarnConfiguration extends C
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
7*24*60*60*1000; // 7 days
+ public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
+ public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
/** The class to use as the persistent store.*/
public static final String RM_STORE = RM_PREFIX + "store.class";
-
+
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "max-completed-applications";
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed Dec 19 04:21:18 2012
@@ -205,6 +205,13 @@
</property>
<property>
+ <description>Enable RM to recover state after starting. If true, then
+ yarn.resourcemanager.store.class must be specified</description>
+ <name>yarn.resourcemanager.recovery.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
<description>The class to use as the persistent store.</description>
<name>yarn.resourcemanager.store.class</name>
</property>
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Dec 19 04:21:18 2012
@@ -263,6 +263,8 @@ public class ApplicationMasterService ex
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
+ // Reboot is not useful since after AM reboots, it will send register and
+ // get an exception. Might as well throw an exception here.
allocateResponse.setAMResponse(reboot);
return allocateResponse;
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Dec 19 04:21:18 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
@@ -75,6 +76,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -254,6 +256,20 @@ public class ClientRMService extends Abs
// So call handle directly and do not send an event.
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
.currentTimeMillis()));
+
+ // If recovery is enabled then store the application information in a
+ // blocking call so make sure that RM has stored the information needed
+ // to restart the AM after RM restart without further client communication
+ RMStateStore stateStore = rmContext.getStateStore();
+ LOG.info("Storing Application with id " + applicationId);
+ try {
+ stateStore.storeApplication(rmContext.getRMApps().get(applicationId));
+ } catch (Exception e) {
+ // For HA this exception needs to be handled by giving up
+ // master status if we got fenced
+ LOG.error("Failed to store application:" + applicationId, e);
+ ExitUtil.terminate(1, e);
+ }
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Dec 19 04:21:18 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,10 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -48,7 +53,8 @@ import org.apache.hadoop.yarn.server.sec
/**
* This class manages the list of applications for the resource manager.
*/
-public class RMAppManager implements EventHandler<RMAppManagerEvent> {
+public class RMAppManager implements EventHandler<RMAppManagerEvent>,
+ Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
@@ -173,6 +179,10 @@ public class RMAppManager implements Eve
completedApps.add(applicationId);
writeAuditLog(applicationId);
+
+ // application completely done. Remove from state
+ RMStateStore store = rmContext.getStateStore();
+ store.removeApplication(rmContext.getRMApps().get(applicationId));
}
}
@@ -306,6 +316,37 @@ public class RMAppManager implements Eve
}
return credentials;
}
+
+ @Override
+ public void recover(RMState state) throws Exception {
+ RMStateStore store = rmContext.getStateStore();
+ assert store != null;
+ // recover applications
+ Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
+ LOG.info("Recovering " + appStates.size() + " applications");
+ for(ApplicationState appState : appStates.values()) {
+ // re-submit the application
+ // this is going to send an app start event but since the async dispatcher
+ // has not started that event will be queued until we have completed re
+ // populating the state
+ if(appState.getApplicationSubmissionContext().getUnmanagedAM()) {
+ // do not recover unmanaged applications since current recovery
+ // mechanism of restarting attempts does not work for them.
+ // This will need to be changed in work preserving recovery in which
+ // RM will re-connect with the running AM's instead of restarting them
+ LOG.info("Not recovering unmanaged application " + appState.getAppId());
+ store.removeApplication(appState);
+ } else {
+ LOG.info("Recovering application " + appState.getAppId());
+ submitApplication(appState.getApplicationSubmissionContext(),
+ appState.getSubmitTime());
+ // re-populate attempt information in application
+ RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
+ appState.getAppId());
+ appImpl.recover(state);
+ }
+ }
+ }
@Override
public void handle(RMAppManagerEvent event) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Wed Dec 19 04:21:18 2012
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMa
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.res
public interface RMContext {
Dispatcher getDispatcher();
+
+ RMStateStore getStateStore();
ConcurrentMap<ApplicationId, RMApp> getRMApps();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Wed Dec 19 04:21:18 2012
@@ -23,7 +23,10 @@ import java.util.concurrent.ConcurrentMa
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import com.google.common.annotations.VisibleForTesting;
+
public class RMContextImpl implements RMContext {
private final Dispatcher rmDispatcher;
@@ -48,6 +53,7 @@ public class RMContextImpl implements RM
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
+ private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private final DelegationTokenRenewer tokenRenewer;
private final ApplicationTokenSecretManager appTokenSecretManager;
@@ -55,6 +61,7 @@ public class RMContextImpl implements RM
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
public RMContextImpl(Dispatcher rmDispatcher,
+ RMStateStore store,
ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
@@ -63,6 +70,7 @@ public class RMContextImpl implements RM
RMContainerTokenSecretManager containerTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
this.rmDispatcher = rmDispatcher;
+ this.stateStore = store;
this.containerAllocationExpirer = containerAllocationExpirer;
this.amLivelinessMonitor = amLivelinessMonitor;
this.amFinishingMonitor = amFinishingMonitor;
@@ -71,11 +79,39 @@ public class RMContextImpl implements RM
this.containerTokenSecretManager = containerTokenSecretManager;
this.clientToAMTokenSecretManager = clientTokenSecretManager;
}
+
+ @VisibleForTesting
+ // helper constructor for tests
+ public RMContextImpl(Dispatcher rmDispatcher,
+ ContainerAllocationExpirer containerAllocationExpirer,
+ AMLivelinessMonitor amLivelinessMonitor,
+ AMLivelinessMonitor amFinishingMonitor,
+ DelegationTokenRenewer tokenRenewer,
+ ApplicationTokenSecretManager appTokenSecretManager,
+ RMContainerTokenSecretManager containerTokenSecretManager,
+ ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
+ this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
+ amFinishingMonitor, tokenRenewer, appTokenSecretManager,
+ containerTokenSecretManager, clientTokenSecretManager);
+ RMStateStore nullStore = new NullRMStateStore();
+ nullStore.setDispatcher(rmDispatcher);
+ try {
+ nullStore.init(new YarnConfiguration());
+ setStateStore(nullStore);
+ } catch (Exception e) {
+ assert false;
+ }
+ }
@Override
public Dispatcher getDispatcher() {
return this.rmDispatcher;
}
+
+ @Override
+ public RMStateStore getStateStore() {
+ return stateStore;
+ }
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
@@ -126,4 +162,9 @@ public class RMContextImpl implements RM
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager;
}
+
+ @VisibleForTesting
+ public void setStateStore(RMStateStore store) {
+ stateStore = store;
+ }
}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Dec 19 04:21:18 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
@@ -45,10 +46,11 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -80,6 +82,8 @@ import org.apache.hadoop.yarn.webapp.Web
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* The ResourceManager is the main class that is a set of components.
* "I am the ResourceManager. All your resources are belong to us..."
@@ -119,14 +123,13 @@ public class ResourceManager extends Com
protected RMDelegationTokenSecretManager rmDTSecretManager;
private WebApp webApp;
protected RMContext rmContext;
- private final RMStateStore store;
protected ResourceTrackerService resourceTracker;
+ private boolean recoveryEnabled;
private Configuration conf;
-
- public ResourceManager(RMStateStore store) {
+
+ public ResourceManager() {
super("ResourceManager");
- this.store = store;
}
public RMContext getRMContext() {
@@ -160,12 +163,34 @@ public class ResourceManager extends Com
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
+ boolean isRecoveryEnabled = conf.getBoolean(
+ YarnConfiguration.RECOVERY_ENABLED,
+ YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
+
+ RMStateStore rmStore = null;
+ if(isRecoveryEnabled) {
+ recoveryEnabled = true;
+ rmStore = RMStateStoreFactory.getStore(conf);
+ } else {
+ recoveryEnabled = false;
+ rmStore = new NullRMStateStore();
+ }
+ try {
+ rmStore.init(conf);
+ rmStore.setDispatcher(rmDispatcher);
+ } catch (Exception e) {
+ // the Exception from stateStore.init() needs to be handled for
+ // HA and we need to give up master status if we got fenced
+ LOG.error("Failed to init state store", e);
+ ExitUtil.terminate(1, e);
+ }
+
this.rmContext =
- new RMContextImpl(this.rmDispatcher,
+ new RMContextImpl(this.rmDispatcher, rmStore,
this.containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, tokenRenewer, this.appTokenSecretManager,
this.containerTokenSecretManager, this.clientToAMSecretManager);
-
+
// Register event handler for NodesListManager
this.nodesListManager = new NodesListManager(this.rmContext);
this.rmDispatcher.register(NodesListManagerEventType.class,
@@ -226,9 +251,15 @@ public class ResourceManager extends Com
addService(applicationMasterLauncher);
new RMNMInfo(this.rmContext, this.scheduler);
-
+
super.init(conf);
}
+
+ @VisibleForTesting
+ protected void setRMStateStore(RMStateStore rmStore) {
+ rmStore.setDispatcher(rmDispatcher);
+ ((RMContextImpl) rmContext).setStateStore(rmStore);
+ }
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) {
@@ -502,6 +533,19 @@ public class ResourceManager extends Com
this.appTokenSecretManager.start();
this.containerTokenSecretManager.start();
+ if(recoveryEnabled) {
+ try {
+ RMStateStore rmStore = rmContext.getStateStore();
+ RMState state = rmStore.loadState();
+ recover(state);
+ } catch (Exception e) {
+ // the Exception from loadState() needs to be handled for
+ // HA and we need to give up master status if we got fenced
+ LOG.error("Failed to load/recover state", e);
+ ExitUtil.terminate(1, e);
+ }
+ }
+
startWepApp();
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
@@ -555,6 +599,13 @@ public class ResourceManager extends Com
DefaultMetricsSystem.shutdown();
+ RMStateStore store = rmContext.getStateStore();
+ try {
+ store.close();
+ } catch (Exception e) {
+ LOG.error("Error closing store.", e);
+ }
+
super.stop();
}
@@ -643,6 +694,8 @@ public class ResourceManager extends Com
@Override
public void recover(RMState state) throws Exception {
+ // recover applications
+ rmAppManager.recover(state);
}
public static void main(String argv[]) {
@@ -650,13 +703,11 @@ public class ResourceManager extends Com
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
- RMStateStore store = StoreFactory.getStore(conf);
- ResourceManager resourceManager = new ResourceManager(store);
+ ResourceManager resourceManager = new ResourceManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf);
- //resourceManager.recover(store.restore());
resourceManager.start();
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
+public class MemoryRMStateStore extends RMStateStore {
+
+ RMState state = new RMState();
+
+ @VisibleForTesting
+ public RMState getState() {
+ return state;
+ }
+
+ @Override
+ public synchronized RMState loadState() throws Exception {
+ // return a copy of the state to allow for modification of the real state
+ RMState returnState = new RMState();
+ returnState.appState.putAll(state.appState);
+ return returnState;
+ }
+
+ @Override
+ public synchronized void initInternal(Configuration conf) {
+ }
+
+ @Override
+ protected synchronized void closeInternal() throws Exception {
+ }
+
+ @Override
+ public void storeApplicationState(String appId,
+ ApplicationStateDataPBImpl appStateData)
+ throws Exception {
+ ApplicationState appState = new ApplicationState(
+ appStateData.getSubmitTime(),
+ appStateData.getApplicationSubmissionContext());
+ state.appState.put(appState.getAppId(), appState);
+ }
+
+ @Override
+ public synchronized void storeApplicationAttemptState(String attemptIdStr,
+ ApplicationAttemptStateDataPBImpl attemptStateData)
+ throws Exception {
+ ApplicationAttemptId attemptId = ConverterUtils
+ .toApplicationAttemptId(attemptIdStr);
+ ApplicationAttemptState attemptState = new ApplicationAttemptState(
+ attemptId, attemptStateData.getMasterContainer());
+
+ ApplicationState appState = state.getApplicationState().get(
+ attemptState.getAttemptId().getApplicationId());
+ assert appState != null;
+
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
+ }
+
+ @Override
+ public synchronized void removeApplicationState(ApplicationState appState)
+ throws Exception {
+ ApplicationId appId = appState.getAppId();
+ ApplicationState removed = state.appState.remove(appId);
+ assert removed != null;
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+
+public class NullRMStateStore extends RMStateStore {
+
+ @Override
+ protected void initInternal(Configuration conf) throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ protected void closeInternal() throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public RMState loadState() throws Exception {
+ return null;
+ }
+
+ @Override
+ protected void storeApplicationState(String appId,
+ ApplicationStateDataPBImpl appStateData) throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ protected void storeApplicationAttemptState(String attemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ protected void removeApplicationState(ApplicationState appState)
+ throws Exception {
+ // Do nothing
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Wed Dec 19 04:21:18 2012
@@ -15,10 +15,313 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-public interface RMStateStore {
- public interface RMState {
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+
+@Private
+@Unstable
+/**
+ * Base class to implement storage of ResourceManager state.
+ * Takes care of asynchronous notifications and interfacing with YARN objects.
+ * Real store implementations need to derive from it and implement blocking
+ * store and load methods to actually store and load the state.
+ */
+public abstract class RMStateStore {
+ public static final Log LOG = LogFactory.getLog(RMStateStore.class);
+
+ /**
+ * State of an application attempt
+ */
+ public static class ApplicationAttemptState {
+ final ApplicationAttemptId attemptId;
+ final Container masterContainer;
+
+ public ApplicationAttemptState(ApplicationAttemptId attemptId,
+ Container masterContainer) {
+ this.attemptId = attemptId;
+ this.masterContainer = masterContainer;
+ }
+
+ public Container getMasterContainer() {
+ return masterContainer;
+ }
+ public ApplicationAttemptId getAttemptId() {
+ return attemptId;
+ }
+ }
+
+ /**
+ * State of an application application
+ */
+ public static class ApplicationState {
+ final ApplicationSubmissionContext context;
+ final long submitTime;
+ Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
+ new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
+
+ ApplicationState(long submitTime, ApplicationSubmissionContext context) {
+ this.submitTime = submitTime;
+ this.context = context;
+ }
+
+ public ApplicationId getAppId() {
+ return context.getApplicationId();
+ }
+ public long getSubmitTime() {
+ return submitTime;
+ }
+ public int getAttemptCount() {
+ return attempts.size();
+ }
+ public ApplicationSubmissionContext getApplicationSubmissionContext() {
+ return context;
+ }
+ public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) {
+ return attempts.get(attemptId);
+ }
+ }
+
+ /**
+ * State of the ResourceManager
+ */
+ public static class RMState {
+ Map<ApplicationId, ApplicationState> appState =
+ new HashMap<ApplicationId, ApplicationState>();
+
+ public Map<ApplicationId, ApplicationState> getApplicationState() {
+ return appState;
+ }
+ }
+
+ private Dispatcher rmDispatcher;
+
+ /**
+ * Dispatcher used to send state operation completion events to
+ * ResourceManager services
+ */
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.rmDispatcher = dispatcher;
+ }
+
+ AsyncDispatcher dispatcher;
+
+ public synchronized void init(Configuration conf) throws Exception{
+ // create async handler
+ dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.register(RMStateStoreEventType.class,
+ new ForwardingEventHandler());
+ dispatcher.start();
+
+ initInternal(conf);
+ }
+
+ /**
+ * Derived classes initialize themselves using this method.
+ * The base class is initialized and the event dispatcher is ready to use at
+ * this point
+ */
+ protected abstract void initInternal(Configuration conf) throws Exception;
+
+ public synchronized void close() throws Exception {
+ closeInternal();
+ dispatcher.stop();
+ }
+
+ /**
+ * Derived classes close themselves using this method.
+ * The base class will be closed and the event dispatcher will be shutdown
+ * after this
+ */
+ protected abstract void closeInternal() throws Exception;
+
+ /**
+ * Blocking API
+ * The derived class must recover state from the store and return a new
+ * RMState object populated with that state
+ * This must not be called on the dispatcher thread
+ */
+ public abstract RMState loadState() throws Exception;
+
+ /**
+ * Blocking API
+ * ResourceManager services use this to store the application's state
+ * This must not be called on the dispatcher thread
+ */
+ public synchronized void storeApplication(RMApp app) throws Exception {
+ ApplicationSubmissionContext context = app
+ .getApplicationSubmissionContext();
+ assert context instanceof ApplicationSubmissionContextPBImpl;
+
+ ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl();
+ appStateData.setSubmitTime(app.getSubmitTime());
+ appStateData.setApplicationSubmissionContext(context);
+
+ LOG.info("Storing info for app: " + context.getApplicationId());
+ storeApplicationState(app.getApplicationId().toString(), appStateData);
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to store the state of an
+ * application.
+ */
+ protected abstract void storeApplicationState(String appId,
+ ApplicationStateDataPBImpl appStateData)
+ throws Exception;
+
+ @SuppressWarnings("unchecked")
+ /**
+ * Non-blocking API
+ * ResourceManager services call this to store state on an application attempt
+ * This does not block the dispatcher threads
+ * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
+ */
+ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
+ ApplicationAttemptState attemptState = new ApplicationAttemptState(
+ appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
+ dispatcher.getEventHandler().handle(
+ new RMStateStoreAppAttemptEvent(attemptState));
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to store the state of an
+ * application attempt
+ */
+ protected abstract void storeApplicationAttemptState(String attemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateData)
+ throws Exception;
+
+
+ /**
+ * Non-blocking API
+ * ResourceManager services call this to remove an application from the state
+ * store
+ * This does not block the dispatcher threads
+ * There is no notification of completion for this operation.
+ */
+ public synchronized void removeApplication(RMApp app) {
+ ApplicationState appState = new ApplicationState(
+ app.getSubmitTime(), app.getApplicationSubmissionContext());
+ for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
+ ApplicationAttemptState attemptState = new ApplicationAttemptState(
+ appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
+ }
+
+ removeApplication(appState);
+ }
+
+ @SuppressWarnings("unchecked")
+ /**
+ * Non-Blocking API
+ */
+ public synchronized void removeApplication(ApplicationState appState) {
+ dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to remove the state of an
+ * application and its attempts
+ */
+ protected abstract void removeApplicationState(ApplicationState appState)
+ throws Exception;
+
+ // Dispatcher related code
+
+ private synchronized void handleStoreEvent(RMStateStoreEvent event) {
+ switch(event.getType()) {
+ case STORE_APP_ATTEMPT:
+ {
+ ApplicationAttemptState attemptState =
+ ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+ Exception storedException = null;
+ ApplicationAttemptStateDataPBImpl attemptStateData =
+ new ApplicationAttemptStateDataPBImpl();
+ attemptStateData.setAttemptId(attemptState.getAttemptId());
+ attemptStateData.setMasterContainer(attemptState.getMasterContainer());
+
+ LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
+ try {
+ storeApplicationAttemptState(attemptState.getAttemptId().toString(),
+ attemptStateData);
+ } catch (Exception e) {
+ LOG.error("Error storing appAttempt: "
+ + attemptState.getAttemptId(), e);
+ storedException = e;
+ } finally {
+ notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
+ storedException);
+ }
+ }
+ break;
+ case REMOVE_APP:
+ {
+ ApplicationState appState =
+ ((RMStateStoreRemoveAppEvent) event).getAppState();
+ ApplicationId appId = appState.getAppId();
+
+ LOG.info("Removing info for app: " + appId);
+ try {
+ removeApplicationState(appState);
+ } catch (Exception e) {
+ LOG.error("Error removing app: " + appId, e);
+ }
+ }
+ break;
+ default:
+ LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ /**
+ * In (@link storeApplicationAttempt}, derived class can call this method to
+ * notify the application attempt about operation completion
+ * @param appAttempt attempt that has been saved
+ */
+ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
+ Exception storedException) {
+ rmDispatcher.getEventHandler().handle(
+ new RMAppAttemptStoredEvent(attemptId, storedException));
+ }
+
+ /**
+ * EventHandler implementation which forward events to the FSRMStateStore
+ * This hides the EventHandle methods of the store from its public interface
+ */
+ private final class ForwardingEventHandler
+ implements EventHandler<RMStateStoreEvent> {
+
+ @Override
+ public void handle(RMStateStoreEvent event) {
+ handleStoreEvent(event);
+ }
}
+
}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+
+public class RMStateStoreAppAttemptEvent extends RMStateStoreEvent {
+ ApplicationAttemptState attemptState;
+
+ public RMStateStoreAppAttemptEvent(ApplicationAttemptState attemptState) {
+ super(RMStateStoreEventType.STORE_APP_ATTEMPT);
+ this.attemptState = attemptState;
+ }
+
+ public ApplicationAttemptState getAppAttemptState() {
+ return attemptState;
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMStateStoreEvent extends AbstractEvent<RMStateStoreEventType> {
+ public RMStateStoreEvent(RMStateStoreEventType type) {
+ super(type);
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+public enum RMStateStoreEventType {
+ STORE_APP_ATTEMPT,
+ REMOVE_APP
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+public class RMStateStoreFactory {
+
+ public static RMStateStore getStore(Configuration conf) {
+ RMStateStore store = ReflectionUtils.newInstance(
+ conf.getClass(YarnConfiguration.RM_STORE,
+ MemoryRMStateStore.class, RMStateStore.class),
+ conf);
+ return store;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+
+public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent {
+ ApplicationState appState;
+
+ RMStateStoreRemoveAppEvent(ApplicationState appState) {
+ super(RMStateStoreEventType.REMOVE_APP);
+ this.appState = appState;
+ }
+
+ public ApplicationState getAppState() {
+ return appState;
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java Wed Dec 19 04:21:18 2012
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.recovery;