You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ju...@apache.org on 2014/08/04 15:35:50 UTC
svn commit: r1615554 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/
hadoop-yarn/hadoop-yarn-server/ha...
Author: junping_du
Date: Mon Aug 4 13:35:49 2014
New Revision: 1615554
URL: http://svn.apache.org/r1615554
Log:
Merge r1615550 from trunk: YARN-1354. Recover applications upon nodemanager restart. (Contributed by Jason Lowe)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
- copied unchanged from r1615550, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Mon Aug 4 13:35:49 2014
@@ -17,6 +17,9 @@ Release 2.6.0 - UNRELEASED
YARN-2181. Added preemption info to logs and RM web UI. (Wangda Tan via
jianhe)
+ YARN-1354. Recover applications upon nodemanager restart. (Jason Lowe via
+ junping_du)
+
IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Mon Aug 4 13:35:49 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
import static org.apache.hadoop.service.Service.STATE.STARTED;
+import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
@@ -42,6 +43,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
@@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -71,6 +74,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -81,6 +86,8 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
@@ -119,11 +126,13 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
public class ContainerManagerImpl extends CompositeService implements
ServiceStateChangeListener, ContainerManagementProtocol,
@@ -224,14 +233,49 @@ public class ContainerManagerImpl extend
recover();
}
+ @SuppressWarnings("unchecked")
private void recover() throws IOException, URISyntaxException {
NMStateStoreService stateStore = context.getNMStateStore();
if (stateStore.canRecover()) {
rsrcLocalizationSrvc.recoverLocalizedResources(
stateStore.loadLocalizationState());
+
+ RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
+ for (ContainerManagerApplicationProto proto :
+ appsState.getApplications()) {
+ recoverApplication(proto);
+ }
+
+ String diagnostic = "Application marked finished during recovery";
+ for (ApplicationId appId : appsState.getFinishedApplications()) {
+ dispatcher.getEventHandler().handle(
+ new ApplicationFinishEvent(appId, diagnostic));
+ }
}
}
+ private void recoverApplication(ContainerManagerApplicationProto p)
+ throws IOException {
+ ApplicationId appId = new ApplicationIdPBImpl(p.getId());
+ Credentials creds = new Credentials();
+ creds.readTokenStorageStream(
+ new DataInputStream(p.getCredentials().newInput()));
+
+ List<ApplicationACLMapProto> aclProtoList = p.getAclsList();
+ Map<ApplicationAccessType, String> acls =
+ new HashMap<ApplicationAccessType, String>(aclProtoList.size());
+ for (ApplicationACLMapProto aclProto : aclProtoList) {
+ acls.put(ProtoUtils.convertFromProtoFormat(aclProto.getAccessType()),
+ aclProto.getAcl());
+ }
+
+ LOG.info("Recovering application " + appId);
+ ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
+ creds, context);
+ context.getApplications().put(appId, app);
+ app.handle(new ApplicationInitEvent(appId, acls));
+ }
+
protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
@@ -358,6 +402,12 @@ public class ContainerManagerImpl extend
}
LOG.info("Applications still running : " + applications.keySet());
+ if (this.context.getNMStateStore().canRecover()
+ && !this.context.getDecommissioned()) {
+ // do not cleanup apps as they can be recovered on restart
+ return;
+ }
+
List<ApplicationId> appIds =
new ArrayList<ApplicationId>(applications.keySet());
this.handle(
@@ -567,6 +617,41 @@ public class ContainerManagerImpl extend
succeededContainers, failedContainers);
}
+ private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
+ String user, Credentials credentials,
+ Map<ApplicationAccessType, String> appAcls) {
+
+ ContainerManagerApplicationProto.Builder builder =
+ ContainerManagerApplicationProto.newBuilder();
+ builder.setId(((ApplicationIdPBImpl) appId).getProto());
+ builder.setUser(user);
+
+ builder.clearCredentials();
+ if (credentials != null) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ try {
+ credentials.writeTokenStorageToStream(dob);
+ builder.setCredentials(ByteString.copyFrom(dob.getData()));
+ } catch (IOException e) {
+ // should not occur
+ LOG.error("Cannot serialize credentials", e);
+ }
+ }
+
+ builder.clearAcls();
+ if (appAcls != null) {
+ for (Map.Entry<ApplicationAccessType, String> acl : appAcls.entrySet()) {
+ ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder()
+ .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
+ .setAcl(acl.getValue())
+ .build();
+ builder.addAcls(p);
+ }
+ }
+
+ return builder.build();
+ }
+
@SuppressWarnings("unchecked")
private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
@@ -640,10 +725,12 @@ public class ContainerManagerImpl extend
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);
-
+ Map<ApplicationAccessType, String> appAcls =
+ container.getLaunchContext().getApplicationACLs();
+ context.getNMStateStore().storeApplication(applicationID,
+ buildAppProto(applicationID, user, credentials, appAcls));
dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, container.getLaunchContext()
- .getApplicationACLs()));
+ new ApplicationInitEvent(applicationID, appAcls));
}
dispatcher.getEventHandler().handle(
@@ -894,6 +981,11 @@ public class ContainerManagerImpl extend
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Application killed by ResourceManager";
}
+ try {
+ this.context.getNMStateStore().storeFinishedApplication(appID);
+ } catch (IOException e) {
+ LOG.error("Unable to update application state in store", e);
+ }
this.dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appID,
diagnostic));
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Mon Aug 4 13:35:49 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -428,6 +429,11 @@ public class ApplicationImpl implements
ApplicationId appId = event.getApplicationID();
app.context.getApplications().remove(appId);
app.aclsManager.removeApplication(appId);
+ try {
+ app.context.getNMStateStore().removeApplication(appId);
+ } catch (IOException e) {
+ LOG.error("Unable to remove application from state store", e);
+ }
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Mon Aug 4 13:35:49 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -74,6 +75,11 @@ public class NMLeveldbStateStoreService
private static final String DELETION_TASK_KEY_PREFIX =
"DeletionService/deltask_";
+ private static final String APPLICATIONS_KEY_PREFIX =
+ "ContainerManager/applications/";
+ private static final String FINISHED_APPS_KEY_PREFIX =
+ "ContainerManager/finishedApps/";
+
private static final String LOCALIZATION_KEY_PREFIX = "Localization/";
private static final String LOCALIZATION_PUBLIC_KEY_PREFIX =
LOCALIZATION_KEY_PREFIX + "public/";
@@ -117,6 +123,92 @@ public class NMLeveldbStateStoreService
@Override
+ public RecoveredApplicationsState loadApplicationsState()
+ throws IOException {
+ RecoveredApplicationsState state = new RecoveredApplicationsState();
+ state.applications = new ArrayList<ContainerManagerApplicationProto>();
+ String keyPrefix = APPLICATIONS_KEY_PREFIX;
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(keyPrefix));
+ while (iter.hasNext()) {
+ Entry<byte[], byte[]> entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(keyPrefix)) {
+ break;
+ }
+ state.applications.add(
+ ContainerManagerApplicationProto.parseFrom(entry.getValue()));
+ }
+
+ state.finishedApplications = new ArrayList<ApplicationId>();
+ keyPrefix = FINISHED_APPS_KEY_PREFIX;
+ iter.seek(bytes(keyPrefix));
+ while (iter.hasNext()) {
+ Entry<byte[], byte[]> entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(keyPrefix)) {
+ break;
+ }
+ ApplicationId appId =
+ ConverterUtils.toApplicationId(key.substring(keyPrefix.length()));
+ state.finishedApplications.add(appId);
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+
+ return state;
+ }
+
+ @Override
+ public void storeApplication(ApplicationId appId,
+ ContainerManagerApplicationProto p) throws IOException {
+ String key = APPLICATIONS_KEY_PREFIX + appId;
+ try {
+ db.put(bytes(key), p.toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void storeFinishedApplication(ApplicationId appId)
+ throws IOException {
+ String key = FINISHED_APPS_KEY_PREFIX + appId;
+ try {
+ db.put(bytes(key), new byte[0]);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void removeApplication(ApplicationId appId)
+ throws IOException {
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ String key = APPLICATIONS_KEY_PREFIX + appId;
+ batch.delete(bytes(key));
+ key = FINISHED_APPS_KEY_PREFIX + appId;
+ batch.delete(bytes(key));
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ @Override
public RecoveredLocalizationState loadLocalizationState()
throws IOException {
RecoveredLocalizationState state = new RecoveredLocalizationState();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Mon Aug 4 13:35:49 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -43,6 +44,25 @@ public class NMNullStateStoreService ext
}
@Override
+ public RecoveredApplicationsState loadApplicationsState() throws IOException {
+ throw new UnsupportedOperationException(
+ "Recovery not supported by this state store");
+ }
+
+ @Override
+ public void storeApplication(ApplicationId appId,
+ ContainerManagerApplicationProto p) throws IOException {
+ }
+
+ @Override
+ public void storeFinishedApplication(ApplicationId appId) {
+ }
+
+ @Override
+ public void removeApplication(ApplicationId appId) throws IOException {
+ }
+
+ @Override
public RecoveredLocalizationState loadLocalizationState()
throws IOException {
throw new UnsupportedOperationException(
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Mon Aug 4 13:35:49 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -45,6 +46,19 @@ public abstract class NMStateStoreServic
super(name);
}
+ public static class RecoveredApplicationsState {
+ List<ContainerManagerApplicationProto> applications;
+ List<ApplicationId> finishedApplications;
+
+ public List<ContainerManagerApplicationProto> getApplications() {
+ return applications;
+ }
+
+ public List<ApplicationId> getFinishedApplications() {
+ return finishedApplications;
+ }
+ }
+
public static class LocalResourceTrackerState {
List<LocalizedResourceProto> localizedResources =
new ArrayList<LocalizedResourceProto>();
@@ -162,6 +176,19 @@ public abstract class NMStateStoreServic
}
+ public abstract RecoveredApplicationsState loadApplicationsState()
+ throws IOException;
+
+ public abstract void storeApplication(ApplicationId appId,
+ ContainerManagerApplicationProto p) throws IOException;
+
+ public abstract void storeFinishedApplication(ApplicationId appId)
+ throws IOException;
+
+ public abstract void removeApplication(ApplicationId appId)
+ throws IOException;
+
+
/**
* Load the state of localized resources
* @return recovered localized resource state
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto Mon Aug 4 13:35:49 2014
@@ -24,6 +24,13 @@ package hadoop.yarn;
import "yarn_protos.proto";
+message ContainerManagerApplicationProto {
+ optional ApplicationIdProto id = 1;
+ optional string user = 2;
+ optional bytes credentials = 3;
+ repeated ApplicationACLMapProto acls = 4;
+}
+
message DeletionServiceDeleteTaskProto {
optional int32 id = 1;
optional string user = 2;
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Mon Aug 4 13:35:49 2014
@@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -91,8 +93,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@SuppressWarnings("rawtypes")
public class TestNodeStatusUpdater {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Mon Aug 4 13:35:49 2014
@@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -722,6 +723,8 @@ public class TestContainer {
Context context = mock(Context.class);
when(context.getApplications()).thenReturn(
new ConcurrentHashMap<ApplicationId, Application>());
+ NMNullStateStoreService stateStore = new NMNullStateStoreService();
+ when(context.getNMStateStore()).thenReturn(stateStore);
ContainerExecutor executor = mock(ContainerExecutor.class);
launcher =
new ContainersLauncher(context, dispatcher, executor, null, null);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Mon Aug 4 13:35:49 2014
@@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.no
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -29,12 +31,15 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
public class NMMemoryStateStoreService extends NMStateStoreService {
+ private Map<ApplicationId, ContainerManagerApplicationProto> apps;
+ private Set<ApplicationId> finishedApps;
private Map<TrackerKey, TrackerState> trackerStates;
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
private RecoveredNMTokensState nmTokenState;
@@ -44,6 +49,58 @@ public class NMMemoryStateStoreService e
super(NMMemoryStateStoreService.class.getName());
}
+ @Override
+ protected void initStorage(Configuration conf) {
+ apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
+ finishedApps = new HashSet<ApplicationId>();
+ nmTokenState = new RecoveredNMTokensState();
+ nmTokenState.applicationMasterKeys =
+ new HashMap<ApplicationAttemptId, MasterKey>();
+ containerTokenState = new RecoveredContainerTokensState();
+ containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
+ trackerStates = new HashMap<TrackerKey, TrackerState>();
+ deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
+ }
+
+ @Override
+ protected void startStorage() {
+ }
+
+ @Override
+ protected void closeStorage() {
+ }
+
+
+ @Override
+ public RecoveredApplicationsState loadApplicationsState()
+ throws IOException {
+ RecoveredApplicationsState state = new RecoveredApplicationsState();
+ state.applications = new ArrayList<ContainerManagerApplicationProto>(
+ apps.values());
+ state.finishedApplications = new ArrayList<ApplicationId>(finishedApps);
+ return state;
+ }
+
+ @Override
+ public void storeApplication(ApplicationId appId,
+ ContainerManagerApplicationProto proto) throws IOException {
+ ContainerManagerApplicationProto protoCopy =
+ ContainerManagerApplicationProto.parseFrom(proto.toByteString());
+ apps.put(appId, protoCopy);
+ }
+
+ @Override
+ public void storeFinishedApplication(ApplicationId appId) {
+ finishedApps.add(appId);
+ }
+
+ @Override
+ public void removeApplication(ApplicationId appId) throws IOException {
+ apps.remove(appId);
+ finishedApps.remove(appId);
+ }
+
+
private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
LocalResourceTrackerState result = new LocalResourceTrackerState();
result.localizedResources.addAll(ts.localizedResources.values());
@@ -117,25 +174,6 @@ public class NMMemoryStateStoreService e
}
}
- @Override
- protected void initStorage(Configuration conf) {
- nmTokenState = new RecoveredNMTokensState();
- nmTokenState.applicationMasterKeys =
- new HashMap<ApplicationAttemptId, MasterKey>();
- containerTokenState = new RecoveredContainerTokensState();
- containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
- trackerStates = new HashMap<TrackerKey, TrackerState>();
- deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
- }
-
- @Override
- protected void startStorage() {
- }
-
- @Override
- protected void closeStorage() {
- }
-
@Override
public RecoveredDeletionServiceState loadDeletionServiceState()
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Mon Aug 4 13:35:49 2014
@@ -37,13 +37,16 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
@@ -142,6 +145,54 @@ public class TestNMLeveldbStateStoreServ
}
@Test
+ public void testApplicationStorage() throws IOException {
+ // test empty when no state
+ RecoveredApplicationsState state = stateStore.loadApplicationsState();
+ assertTrue(state.getApplications().isEmpty());
+ assertTrue(state.getFinishedApplications().isEmpty());
+
+ // store an application and verify recovered
+ final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
+ ContainerManagerApplicationProto.Builder builder =
+ ContainerManagerApplicationProto.newBuilder();
+ builder.setId(((ApplicationIdPBImpl) appId1).getProto());
+ builder.setUser("user1");
+ ContainerManagerApplicationProto appProto1 = builder.build();
+ stateStore.storeApplication(appId1, appProto1);
+ restartStateStore();
+ state = stateStore.loadApplicationsState();
+ assertEquals(1, state.getApplications().size());
+ assertEquals(appProto1, state.getApplications().get(0));
+ assertTrue(state.getFinishedApplications().isEmpty());
+
+ // finish an application and add a new one
+ stateStore.storeFinishedApplication(appId1);
+ final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
+ builder = ContainerManagerApplicationProto.newBuilder();
+ builder.setId(((ApplicationIdPBImpl) appId2).getProto());
+ builder.setUser("user2");
+ ContainerManagerApplicationProto appProto2 = builder.build();
+ stateStore.storeApplication(appId2, appProto2);
+ restartStateStore();
+ state = stateStore.loadApplicationsState();
+ assertEquals(2, state.getApplications().size());
+ assertTrue(state.getApplications().contains(appProto1));
+ assertTrue(state.getApplications().contains(appProto2));
+ assertEquals(1, state.getFinishedApplications().size());
+ assertEquals(appId1, state.getFinishedApplications().get(0));
+
+ // test removing an application
+ stateStore.storeFinishedApplication(appId2);
+ stateStore.removeApplication(appId2);
+ restartStateStore();
+ state = stateStore.loadApplicationsState();
+ assertEquals(1, state.getApplications().size());
+ assertEquals(appProto1, state.getApplications().get(0));
+ assertEquals(1, state.getFinishedApplications().size());
+ assertEquals(appId1, state.getFinishedApplications().get(0));
+ }
+
+ @Test
public void testStartResourceLocalization() throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Mon Aug 4 13:35:49 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -312,7 +311,8 @@ public class ResourceTrackerService exte
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeReconnectEvent(nodeId, rmNode));
+ new RMNodeReconnectEvent(nodeId, rmNode,
+ request.getRunningApplications()));
}
// On every node manager register we will be clearing NMToken keys if
// present for any running application.
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Aug 4 13:35:49 2014
@@ -1191,6 +1191,9 @@ public class RMAppImpl implements RMApp,
public static boolean isAppInFinalState(RMApp rmApp) {
RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
+ if (appState == null) {
+ appState = rmApp.getState();
+ }
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|| appState == RMAppState.KILLED;
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Mon Aug 4 13:35:49 2014
@@ -456,6 +456,24 @@ public class RMNodeImpl implements RMNod
}
}
+ private static void handleRunningAppOnNode(RMNodeImpl rmNode,
+ RMContext context, ApplicationId appId, NodeId nodeId) {
+ RMApp app = context.getRMApps().get(appId);
+
+ // if we failed getting app by appId, maybe something wrong happened, just
+ // add the app to the finishedApplications list so that the app can be
+ // cleaned up on the NM
+ if (null == app) {
+ LOG.warn("Cannot get RMApp by appId=" + appId
+ + ", just added it to finishedApplications list for cleanup");
+ rmNode.finishedApplications.add(appId);
+ return;
+ }
+
+ context.getDispatcher().getEventHandler()
+ .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
+ }
+
public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@@ -496,24 +514,6 @@ public class RMNodeImpl implements RMNod
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
}
-
- void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context,
- ApplicationId appId, NodeId nodeId) {
- RMApp app = context.getRMApps().get(appId);
-
- // if we failed getting app by appId, maybe something wrong happened, just
- // add the app to the finishedApplications list so that the app can be
- // cleaned up on the NM
- if (null == app) {
- LOG.warn("Cannot get RMApp by appId=" + appId
- + ", just added it to finishedApplications list for cleanup");
- rmNode.finishedApplications.add(appId);
- return;
- }
-
- context.getDispatcher().getEventHandler()
- .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
- }
}
public static class ReconnectNodeTransition implements
@@ -526,7 +526,8 @@ public class RMNodeImpl implements RMNod
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
- RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
+ RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
+ RMNode newNode = reconnectEvent.getReconnectedNode();
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
&& rmNode.getHttpPort() == newNode.getHttpPort()) {
@@ -551,6 +552,13 @@ public class RMNodeImpl implements RMNod
rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(newNode.getNodeID(), null, null));
}
+
+ if (null != reconnectEvent.getRunningApplications()) {
+ for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
+ handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
+ }
+ }
+
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java Mon Aug 4 13:35:49 2014
@@ -18,17 +18,27 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
public class RMNodeReconnectEvent extends RMNodeEvent {
private RMNode reconnectedNode;
+ private List<ApplicationId> runningApplications;
- public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
+ public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
+ List<ApplicationId> runningApps) {
super(nodeId, RMNodeEventType.RECONNECTED);
reconnectedNode = newNode;
+ runningApplications = runningApps;
}
public RMNode getReconnectedNode() {
return reconnectedNode;
}
+
+ public List<ApplicationId> getRunningApplications() {
+ return runningApplications;
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Mon Aug 4 13:35:49 2014
@@ -449,6 +449,35 @@ public class TestApplicationCleanup {
rm2.stop();
}
+ @Test (timeout = 60000)
+ public void testAppCleanupWhenNMReconnects() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create app and launch the AM
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+ // wait for application cleanup message received
+ waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+ // reconnect NM with application still active
+ nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+ waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+ rm1.stop();
+ }
+
public static void main(String[] args) throws Exception {
TestApplicationCleanup t = new TestApplicationCleanup();
t.testAppCleanup();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1615554&r1=1615553&r2=1615554&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Mon Aug 4 13:35:49 2014
@@ -520,7 +520,7 @@ public class TestRMNodeTransitions {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
- node.handle(new RMNodeReconnectEvent(node.getNodeID(), node));
+ node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
@@ -542,7 +542,8 @@ public class TestRMNodeTransitions {
RMNodeImpl node = getRunningNode(nmVersion1);
Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
- node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode));
+ node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode,
+ null));
Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
}
}