You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cu...@apache.org on 2014/08/20 03:34:59 UTC
svn commit: r1619019 [4/10] - in
/hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/
hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/had...
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed Aug 20 01:34:29 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
import static org.apache.hadoop.service.Service.STATE.STARTED;
+import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
@@ -42,6 +43,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
@@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -71,6 +74,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -81,6 +86,8 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
@@ -119,11 +126,15 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
public class ContainerManagerImpl extends CompositeService implements
ServiceStateChangeListener, ContainerManagementProtocol,
@@ -224,11 +235,104 @@ public class ContainerManagerImpl extend
recover();
}
+ @SuppressWarnings("unchecked")
private void recover() throws IOException, URISyntaxException {
NMStateStoreService stateStore = context.getNMStateStore();
if (stateStore.canRecover()) {
rsrcLocalizationSrvc.recoverLocalizedResources(
stateStore.loadLocalizationState());
+
+ RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
+ for (ContainerManagerApplicationProto proto :
+ appsState.getApplications()) {
+ recoverApplication(proto);
+ }
+
+ for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
+ recoverContainer(rcs);
+ }
+
+ String diagnostic = "Application marked finished during recovery";
+ for (ApplicationId appId : appsState.getFinishedApplications()) {
+ dispatcher.getEventHandler().handle(
+ new ApplicationFinishEvent(appId, diagnostic));
+ }
+ }
+ }
+
+ private void recoverApplication(ContainerManagerApplicationProto p)
+ throws IOException {
+ ApplicationId appId = new ApplicationIdPBImpl(p.getId());
+ Credentials creds = new Credentials();
+ creds.readTokenStorageStream(
+ new DataInputStream(p.getCredentials().newInput()));
+
+ List<ApplicationACLMapProto> aclProtoList = p.getAclsList();
+ Map<ApplicationAccessType, String> acls =
+ new HashMap<ApplicationAccessType, String>(aclProtoList.size());
+ for (ApplicationACLMapProto aclProto : aclProtoList) {
+ acls.put(ProtoUtils.convertFromProtoFormat(aclProto.getAccessType()),
+ aclProto.getAcl());
+ }
+
+ LOG.info("Recovering application " + appId);
+ ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
+ creds, context);
+ context.getApplications().put(appId, app);
+ app.handle(new ApplicationInitEvent(appId, acls));
+ }
+
+ @SuppressWarnings("unchecked")
+ private void recoverContainer(RecoveredContainerState rcs)
+ throws IOException {
+ StartContainerRequest req = rcs.getStartRequest();
+ ContainerLaunchContext launchContext = req.getContainerLaunchContext();
+ ContainerTokenIdentifier token =
+ BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+ ContainerId containerId = token.getContainerID();
+ ApplicationId appId =
+ containerId.getApplicationAttemptId().getApplicationId();
+
+ LOG.info("Recovering " + containerId + " in state " + rcs.getStatus()
+ + " with exit code " + rcs.getExitCode());
+
+ if (context.getApplications().containsKey(appId)) {
+ Credentials credentials = parseCredentials(launchContext);
+ Container container = new ContainerImpl(getConfig(), dispatcher,
+ context.getNMStateStore(), req.getContainerLaunchContext(),
+ credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
+ rcs.getDiagnostics(), rcs.getKilled());
+ context.getContainers().put(containerId, container);
+ dispatcher.getEventHandler().handle(
+ new ApplicationContainerInitEvent(container));
+ } else {
+ if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
+ LOG.warn(containerId + " has no corresponding application!");
+ }
+ LOG.info("Adding " + containerId + " to recently stopped containers");
+ nodeStatusUpdater.addCompletedContainer(containerId);
+ }
+ }
+
+ private void waitForRecoveredContainers() throws InterruptedException {
+ final int sleepMsec = 100;
+ int waitIterations = 100;
+ List<ContainerId> newContainers = new ArrayList<ContainerId>();
+ while (--waitIterations >= 0) {
+ newContainers.clear();
+ for (Container container : context.getContainers().values()) {
+ if (container.getContainerState() == org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.NEW) {
+ newContainers.add(container.getContainerId());
+ }
+ }
+ if (newContainers.isEmpty()) {
+ break;
+ }
+ LOG.info("Waiting for containers: " + newContainers);
+ Thread.sleep(sleepMsec);
+ }
+ if (waitIterations < 0) {
+ LOG.warn("Timeout waiting for recovered containers");
}
}
@@ -265,6 +369,23 @@ public class ContainerManagerImpl extend
// Enqueue user dirs in deletion context
Configuration conf = getConfig();
+ final InetSocketAddress initialAddress = conf.getSocketAddr(
+ YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_PORT);
+ boolean usingEphemeralPort = (initialAddress.getPort() == 0);
+ if (context.getNMStateStore().canRecover() && usingEphemeralPort) {
+ throw new IllegalArgumentException("Cannot support recovery with an "
+ + "ephemeral server port. Check the setting of "
+ + YarnConfiguration.NM_ADDRESS);
+ }
+ // If recovering then delay opening the RPC service until the recovery
+ // of resources and containers have completed, otherwise requests from
+ // clients during recovery can interfere with the recovery process.
+ final boolean delayedRpcServerStart =
+ context.getNMStateStore().canRecover();
+
Configuration serverConf = new Configuration(conf);
// always enforce it to be token-based.
@@ -274,11 +395,6 @@ public class ContainerManagerImpl extend
YarnRPC rpc = YarnRPC.create(conf);
- InetSocketAddress initialAddress = conf.getSocketAddr(
- YarnConfiguration.NM_ADDRESS,
- YarnConfiguration.DEFAULT_NM_ADDRESS,
- YarnConfiguration.DEFAULT_NM_PORT);
-
server =
rpc.getServer(ContainerManagementProtocol.class, this, initialAddress,
serverConf, this.context.getNMTokenSecretManager(),
@@ -295,16 +411,61 @@ public class ContainerManagerImpl extend
LOG.info("Blocking new container-requests as container manager rpc" +
" server is still starting.");
this.setBlockNewContainerRequests(true);
- server.start();
- InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
- NodeId nodeId = NodeId.newInstance(
- connectAddress.getAddress().getCanonicalHostName(),
- connectAddress.getPort());
+
+ String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);
+ String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);
+ String hostOverride = null;
+ if (bindHost != null && !bindHost.isEmpty()
+ && nmAddress != null && !nmAddress.isEmpty()) {
+ //a bind-host case with an address, to support overriding the first
+ //hostname found when querying for our hostname with the specified
+ //address, combine the specified address with the actual port listened
+ //on by the server
+ hostOverride = nmAddress.split(":")[0];
+ }
+
+ // setup node ID
+ InetSocketAddress connectAddress;
+ if (delayedRpcServerStart) {
+ connectAddress = NetUtils.getConnectAddress(initialAddress);
+ } else {
+ server.start();
+ connectAddress = NetUtils.getConnectAddress(server);
+ }
+ NodeId nodeId = buildNodeId(connectAddress, hostOverride);
((NodeManager.NMContext)context).setNodeId(nodeId);
this.context.getNMTokenSecretManager().setNodeId(nodeId);
this.context.getContainerTokenSecretManager().setNodeId(nodeId);
- LOG.info("ContainerManager started at " + connectAddress);
+
+ // start remaining services
super.serviceStart();
+
+ if (delayedRpcServerStart) {
+ waitForRecoveredContainers();
+ server.start();
+
+ // check that the node ID is as previously advertised
+ connectAddress = NetUtils.getConnectAddress(server);
+ NodeId serverNode = buildNodeId(connectAddress, hostOverride);
+ if (!serverNode.equals(nodeId)) {
+ throw new IOException("Node mismatch after server started, expected '"
+ + nodeId + "' but found '" + serverNode + "'");
+ }
+ }
+
+ LOG.info("ContainerManager started at " + connectAddress);
+ LOG.info("ContainerManager bound to " + initialAddress);
+ }
+
+ private NodeId buildNodeId(InetSocketAddress connectAddress,
+ String hostOverride) {
+ if (hostOverride != null) {
+ connectAddress = NetUtils.getConnectAddress(
+ new InetSocketAddress(hostOverride, connectAddress.getPort()));
+ }
+ return NodeId.newInstance(
+ connectAddress.getAddress().getCanonicalHostName(),
+ connectAddress.getPort());
}
void refreshServiceAcls(Configuration configuration,
@@ -341,6 +502,12 @@ public class ContainerManagerImpl extend
}
LOG.info("Applications still running : " + applications.keySet());
+ if (this.context.getNMStateStore().canRecover()
+ && !this.context.getDecommissioned()) {
+ // do not cleanup apps as they can be recovered on restart
+ return;
+ }
+
List<ApplicationId> appIds =
new ArrayList<ApplicationId>(applications.keySet());
this.handle(
@@ -497,6 +664,8 @@ public class ContainerManagerImpl extend
messageBuilder.append("\nThis token is expired. current time is ")
.append(System.currentTimeMillis()).append(" found ")
.append(containerTokenIdentifier.getExpiryTimeStamp());
+ messageBuilder.append("\nNote: System times on machines may be out of sync.")
+ .append(" Check system time and time zones.");
}
if (unauthorized) {
String msg = messageBuilder.toString();
@@ -548,6 +717,41 @@ public class ContainerManagerImpl extend
succeededContainers, failedContainers);
}
+ private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
+ String user, Credentials credentials,
+ Map<ApplicationAccessType, String> appAcls) {
+
+ ContainerManagerApplicationProto.Builder builder =
+ ContainerManagerApplicationProto.newBuilder();
+ builder.setId(((ApplicationIdPBImpl) appId).getProto());
+ builder.setUser(user);
+
+ builder.clearCredentials();
+ if (credentials != null) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ try {
+ credentials.writeTokenStorageToStream(dob);
+ builder.setCredentials(ByteString.copyFrom(dob.getData()));
+ } catch (IOException e) {
+ // should not occur
+ LOG.error("Cannot serialize credentials", e);
+ }
+ }
+
+ builder.clearAcls();
+ if (appAcls != null) {
+ for (Map.Entry<ApplicationAccessType, String> acl : appAcls.entrySet()) {
+ ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder()
+ .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
+ .setAcl(acl.getValue())
+ .build();
+ builder.addAcls(p);
+ }
+ }
+
+ return builder.build();
+ }
+
@SuppressWarnings("unchecked")
private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
@@ -600,7 +804,8 @@ public class ContainerManagerImpl extend
Credentials credentials = parseCredentials(launchContext);
Container container =
- new ContainerImpl(getConfig(), this.dispatcher, launchContext,
+ new ContainerImpl(getConfig(), this.dispatcher,
+ context.getNMStateStore(), launchContext,
credentials, metrics, containerTokenIdentifier);
ApplicationId applicationID =
containerId.getApplicationAttemptId().getApplicationId();
@@ -621,12 +826,15 @@ public class ContainerManagerImpl extend
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);
-
+ Map<ApplicationAccessType, String> appAcls =
+ container.getLaunchContext().getApplicationACLs();
+ context.getNMStateStore().storeApplication(applicationID,
+ buildAppProto(applicationID, user, credentials, appAcls));
dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, container.getLaunchContext()
- .getApplicationACLs()));
+ new ApplicationInitEvent(applicationID, appAcls));
}
+ this.context.getNMStateStore().storeContainer(containerId, request);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
@@ -674,7 +882,7 @@ public class ContainerManagerImpl extend
}
private Credentials parseCredentials(ContainerLaunchContext launchContext)
- throws YarnException {
+ throws IOException {
Credentials credentials = new Credentials();
// //////////// Parse credentials
ByteBuffer tokens = launchContext.getTokens();
@@ -683,15 +891,11 @@ public class ContainerManagerImpl extend
DataInputByteBuffer buf = new DataInputByteBuffer();
tokens.rewind();
buf.reset(tokens);
- try {
- credentials.readTokenStorageStream(buf);
- if (LOG.isDebugEnabled()) {
- for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
- LOG.debug(tk.getService() + " = " + tk.toString());
- }
+ credentials.readTokenStorageStream(buf);
+ if (LOG.isDebugEnabled()) {
+ for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
+ LOG.debug(tk.getService() + " = " + tk.toString());
}
- } catch (IOException e) {
- throw RPCUtil.getRemoteException(e);
}
}
// //////////// End of parsing credentials
@@ -724,7 +928,7 @@ public class ContainerManagerImpl extend
@SuppressWarnings("unchecked")
private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
- ContainerId containerID) throws YarnException {
+ ContainerId containerID) throws YarnException, IOException {
String containerIDStr = containerID.toString();
Container container = this.context.getContainers().get(containerID);
LOG.info("Stopping container with container Id: " + containerIDStr);
@@ -737,6 +941,7 @@ public class ContainerManagerImpl extend
+ " is not handled by this NodeManager");
}
} else {
+ context.getNMStateStore().storeContainerKilled(containerID);
dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
ContainerExitStatus.KILLED_BY_APPMASTER,
@@ -875,6 +1080,11 @@ public class ContainerManagerImpl extend
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Application killed by ResourceManager";
}
+ try {
+ this.context.getNMStateStore().storeFinishedApplication(appID);
+ } catch (IOException e) {
+ LOG.error("Unable to update application state in store", e);
+ }
this.dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appID,
diagnostic));
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Wed Aug 20 01:34:29 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -428,6 +429,11 @@ public class ApplicationImpl implements
ApplicationId appId = event.getApplicationID();
app.context.getApplications().remove(appId);
app.aclsManager.removeApplication(appId);
+ try {
+ app.context.getNMStateStore().removeApplication(appId);
+ } catch (IOException e) {
+ LOG.error("Unable to remove application from state store", e);
+ }
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Wed Aug 20 01:34:29 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -62,6 +63,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -75,6 +78,7 @@ public class ContainerImpl implements Co
private final Lock readLock;
private final Lock writeLock;
private final Dispatcher dispatcher;
+ private final NMStateStoreService stateStore;
private final Credentials credentials;
private final NodeManagerMetrics metrics;
private final ContainerLaunchContext launchContext;
@@ -101,12 +105,19 @@ public class ContainerImpl implements Co
private final List<LocalResourceRequest> appRsrcs =
new ArrayList<LocalResourceRequest>();
+ // whether container has been recovered after a restart
+ private RecoveredContainerStatus recoveredStatus =
+ RecoveredContainerStatus.REQUESTED;
+ // whether container was marked as killed after recovery
+ private boolean recoveredAsKilled = false;
+
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
- ContainerLaunchContext launchContext, Credentials creds,
- NodeManagerMetrics metrics,
+ NMStateStoreService stateStore, ContainerLaunchContext launchContext,
+ Credentials creds, NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier) {
this.daemonConf = conf;
this.dispatcher = dispatcher;
+ this.stateStore = stateStore;
this.launchContext = launchContext;
this.containerTokenIdentifier = containerTokenIdentifier;
this.containerId = containerTokenIdentifier.getContainerID();
@@ -122,6 +133,21 @@ public class ContainerImpl implements Co
stateMachine = stateMachineFactory.make(this);
}
+ // constructor for a recovered container
+ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+ NMStateStoreService stateStore, ContainerLaunchContext launchContext,
+ Credentials creds, NodeManagerMetrics metrics,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ RecoveredContainerStatus recoveredStatus, int exitCode,
+ String diagnostics, boolean wasKilled) {
+ this(conf, dispatcher, stateStore, launchContext, creds, metrics,
+ containerTokenIdentifier);
+ this.recoveredStatus = recoveredStatus;
+ this.exitCode = exitCode;
+ this.recoveredAsKilled = wasKilled;
+ this.diagnostics.append(diagnostics);
+ }
+
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
new ContainerDoneTransition();
@@ -135,8 +161,10 @@ public class ContainerImpl implements Co
new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
// From NEW State
.addTransition(ContainerState.NEW,
- EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED,
- ContainerState.LOCALIZATION_FAILED),
+ EnumSet.of(ContainerState.LOCALIZING,
+ ContainerState.LOCALIZED,
+ ContainerState.LOCALIZATION_FAILED,
+ ContainerState.DONE),
ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
.addTransition(ContainerState.NEW, ContainerState.NEW,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@@ -281,7 +309,9 @@ public class ContainerImpl implements Co
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
- ContainerEventType.KILL_CONTAINER)
+ EnumSet.of(ContainerEventType.KILL_CONTAINER,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -295,7 +325,9 @@ public class ContainerImpl implements Co
// we notify container of failed localization if localizer thread (for
// that container) fails for some reason
.addTransition(ContainerState.DONE, ContainerState.DONE,
- ContainerEventType.RESOURCE_FAILED)
+ EnumSet.of(ContainerEventType.RESOURCE_FAILED,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
// create the topology tables
.installTopology();
@@ -420,7 +452,7 @@ public class ContainerImpl implements Co
}
}
- @SuppressWarnings({"fallthrough", "unchecked"})
+ @SuppressWarnings("fallthrough")
private void finished() {
ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId();
@@ -458,7 +490,11 @@ public class ContainerImpl implements Co
}
metrics.releaseContainer(this.resource);
+ sendFinishedEvents();
+ }
+ @SuppressWarnings("unchecked")
+ private void sendFinishedEvents() {
// Inform the application
@SuppressWarnings("rawtypes")
EventHandler eventHandler = dispatcher.getEventHandler();
@@ -471,6 +507,45 @@ public class ContainerImpl implements Co
}
@SuppressWarnings("unchecked") // dispatcher not typed
+ private void sendLaunchEvent() {
+ ContainersLauncherEventType launcherEvent =
+ ContainersLauncherEventType.LAUNCH_CONTAINER;
+ if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
+ // try to recover a container that was previously launched
+ launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+ }
+ dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(this, launcherEvent));
+ }
+
+ // Inform the ContainersMonitor to start monitoring the container's
+ // resource usage.
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ private void sendContainerMonitorStartEvent() {
+ long pmemBytes = getResource().getMemory() * 1024 * 1024L;
+ float pmemRatio = daemonConf.getFloat(
+ YarnConfiguration.NM_VMEM_PMEM_RATIO,
+ YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+ long vmemBytes = (long) (pmemRatio * pmemBytes);
+
+ dispatcher.getEventHandler().handle(
+ new ContainerStartMonitoringEvent(containerId,
+ vmemBytes, pmemBytes));
+ }
+
+ private void addDiagnostics(String... diags) {
+ for (String s : diags) {
+ this.diagnostics.append(s);
+ }
+ try {
+ stateStore.storeContainerDiagnostics(containerId, diagnostics);
+ } catch (IOException e) {
+ LOG.warn("Unable to update diagnostics in state store for "
+ + containerId, e);
+ }
+ }
+
+ @SuppressWarnings("unchecked") // dispatcher not typed
public void cleanup() {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
new HashMap<LocalResourceVisibility,
@@ -518,6 +593,16 @@ public class ContainerImpl implements Co
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
+ if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
+ container.sendFinishedEvents();
+ return ContainerState.DONE;
+ } else if (container.recoveredAsKilled &&
+ container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
+ // container was killed but never launched
+ container.finished();
+ return ContainerState.DONE;
+ }
+
final ContainerLaunchContext ctxt = container.launchContext;
container.metrics.initingContainer();
@@ -593,9 +678,7 @@ public class ContainerImpl implements Co
new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
- container.dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(container,
- ContainersLauncherEventType.LAUNCH_CONTAINER));
+ container.sendLaunchEvent();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
@@ -606,7 +689,6 @@ public class ContainerImpl implements Co
* Transition when one of the requested resources for this container
* has been successfully localized.
*/
- @SuppressWarnings("unchecked") // dispatcher not typed
static class LocalizedTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@Override
@@ -626,9 +708,8 @@ public class ContainerImpl implements Co
if (!container.pendingResources.isEmpty()) {
return ContainerState.LOCALIZING;
}
- container.dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(container,
- ContainersLauncherEventType.LAUNCH_CONTAINER));
+
+ container.sendLaunchEvent();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
@@ -638,24 +719,22 @@ public class ContainerImpl implements Co
* Transition from LOCALIZED state to RUNNING state upon receiving
* a CONTAINER_LAUNCHED event
*/
- @SuppressWarnings("unchecked") // dispatcher not typed
static class LaunchTransition extends ContainerTransition {
+ @SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
- // Inform the ContainersMonitor to start monitoring the container's
- // resource usage.
- long pmemBytes =
- container.getResource().getMemory() * 1024 * 1024L;
- float pmemRatio = container.daemonConf.getFloat(
- YarnConfiguration.NM_VMEM_PMEM_RATIO,
- YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
- long vmemBytes = (long) (pmemRatio * pmemBytes);
-
- container.dispatcher.getEventHandler().handle(
- new ContainerStartMonitoringEvent(container.containerId,
- vmemBytes, pmemBytes));
+ container.sendContainerMonitorStartEvent();
container.metrics.runningContainer();
container.wasLaunched = true;
+
+ if (container.recoveredAsKilled) {
+ LOG.info("Killing " + container.containerId
+ + " due to recovered as killed");
+ container.addDiagnostics("Container recovered as killed.\n");
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.CLEANUP_CONTAINER));
+ }
}
}
@@ -707,8 +786,7 @@ public class ContainerImpl implements Co
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
container.exitCode = exitEvent.getExitCode();
if (exitEvent.getDiagnosticInfo() != null) {
- container.diagnostics.append(exitEvent.getDiagnosticInfo())
- .append('\n');
+ container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
}
// TODO: Add containerWorkDir to the deletion service.
@@ -735,7 +813,7 @@ public class ContainerImpl implements Co
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
super.transition(container, event);
- container.diagnostics.append("Killed by external signal\n");
+ container.addDiagnostics("Killed by external signal\n");
}
}
@@ -750,9 +828,7 @@ public class ContainerImpl implements Co
ContainerResourceFailedEvent rsrcFailedEvent =
(ContainerResourceFailedEvent) event;
- container.diagnostics.append(rsrcFailedEvent.getDiagnosticMessage()
- + "\n");
-
+ container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage(), "\n");
// Inform the localizer to decrement reference counts and cleanup
// resources.
@@ -775,8 +851,8 @@ public class ContainerImpl implements Co
container.metrics.endInitingContainer();
ContainerKillEvent killEvent = (ContainerKillEvent) event;
container.exitCode = killEvent.getContainerExitStatus();
- container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
- container.diagnostics.append("Container is killed before being launched.\n");
+ container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+ container.addDiagnostics("Container is killed before being launched.\n");
}
}
@@ -817,7 +893,7 @@ public class ContainerImpl implements Co
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
ContainerKillEvent killEvent = (ContainerKillEvent) event;
- container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
+ container.addDiagnostics(killEvent.getDiagnostic(), "\n");
container.exitCode = killEvent.getContainerExitStatus();
}
}
@@ -836,8 +912,7 @@ public class ContainerImpl implements Co
}
if (exitEvent.getDiagnosticInfo() != null) {
- container.diagnostics.append(exitEvent.getDiagnosticInfo())
- .append('\n');
+ container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
}
// The process/process-grp is killed. Decrement reference counts and
@@ -877,8 +952,8 @@ public class ContainerImpl implements Co
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerKillEvent killEvent = (ContainerKillEvent) event;
container.exitCode = killEvent.getContainerExitStatus();
- container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
- container.diagnostics.append("Container is killed before being launched.\n");
+ container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+ container.addDiagnostics("Container is killed before being launched.\n");
super.transition(container, event);
}
}
@@ -892,8 +967,14 @@ public class ContainerImpl implements Co
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerDiagnosticsUpdateEvent updateEvent =
(ContainerDiagnosticsUpdateEvent) event;
- container.diagnostics.append(updateEvent.getDiagnosticsUpdate())
- .append("\n");
+ container.addDiagnostics(updateEvent.getDiagnosticsUpdate(), "\n");
+ try {
+ container.stateStore.storeContainerDiagnostics(container.containerId,
+ container.diagnostics);
+ } catch (IOException e) {
+ LOG.warn("Unable to update state store diagnostics for "
+ + container.containerId, e);
+ }
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Wed Aug 20 01:34:29 2014
@@ -87,22 +87,23 @@ public class ContainerLaunch implements
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
private static final String PID_FILE_NAME_FMT = "%s.pid";
+ private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";
- private final Dispatcher dispatcher;
- private final ContainerExecutor exec;
+ protected final Dispatcher dispatcher;
+ protected final ContainerExecutor exec;
private final Application app;
- private final Container container;
+ protected final Container container;
private final Configuration conf;
private final Context context;
private final ContainerManagerImpl containerManager;
- private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
- private volatile AtomicBoolean completed = new AtomicBoolean(false);
+ protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
+ protected AtomicBoolean completed = new AtomicBoolean(false);
private long sleepDelayBeforeSigKill = 250;
private long maxKillWaitTime = 2000;
- private Path pidFilePath = null;
+ protected Path pidFilePath = null;
private final LocalDirsHandlerService dirsHandler;
@@ -223,14 +224,11 @@ public class ContainerLaunch implements
+ Path.SEPARATOR + containerIdStr,
LocalDirAllocator.SIZE_UNKNOWN, false);
- String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
- containerIdStr);
+ String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
// pid file should be in nm private dir so that it is not
// accessible by users
- pidFilePath = dirsHandler.getLocalPathForWrite(
- ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
- + pidFileSuffix);
+ pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
@@ -288,6 +286,7 @@ public class ContainerLaunch implements
dispatcher.getEventHandler().handle(new ContainerEvent(
containerID,
ContainerEventType.CONTAINER_LAUNCHED));
+ context.getNMStateStore().storeContainerLaunched(containerID);
// Check if the container is signalled to be killed.
if (!shouldLaunchContainer.compareAndSet(false, true)) {
@@ -310,6 +309,11 @@ public class ContainerLaunch implements
} finally {
completed.set(true);
exec.deactivateContainer(containerID);
+ try {
+ context.getNMStateStore().storeContainerCompleted(containerID, ret);
+ } catch (IOException e) {
+ LOG.error("Unable to set exit code for container " + containerID);
+ }
}
if (LOG.isDebugEnabled()) {
@@ -342,6 +346,11 @@ public class ContainerLaunch implements
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
return 0;
}
+
+ protected String getPidFileSubpath(String appIdStr, String containerIdStr) {
+ return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
+ }
/**
* Cleanup the container.
@@ -357,6 +366,13 @@ public class ContainerLaunch implements
String containerIdStr = ConverterUtils.toString(containerId);
LOG.info("Cleaning up container " + containerIdStr);
+ try {
+ context.getNMStateStore().storeContainerKilled(containerId);
+ } catch (IOException e) {
+ LOG.error("Unable to mark container " + containerId
+ + " killed in store", e);
+ }
+
// launch flag will be set to true if process already launched
boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
if (!alreadyLaunched) {
@@ -421,6 +437,7 @@ public class ContainerLaunch implements
if (pidFilePath != null) {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(pidFilePath, false);
+ lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
}
}
}
@@ -479,6 +496,10 @@ public class ContainerLaunch implements
+ appIdStr;
}
+ Context getContext() {
+ return context;
+ }
+
@VisibleForTesting
static abstract class ShellScriptBuilder {
public static ShellScriptBuilder create() {
@@ -787,4 +808,7 @@ public class ContainerLaunch implements
}
}
+ public static String getExitCodeFile(String pidFile) {
+ return pidFile + EXIT_CODE_FILE_SUFFIX;
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Wed Aug 20 01:34:29 2014
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,21 +31,16 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import com.google.common.annotations.VisibleForTesting;
@@ -107,7 +101,6 @@ public class ContainersLauncher extends
super.serviceStop();
}
- @SuppressWarnings("unchecked")
@Override
public void handle(ContainersLauncherEvent event) {
// TODO: ContainersLauncher launches containers one by one!!
@@ -125,6 +118,14 @@ public class ContainersLauncher extends
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
+ case RECOVER_CONTAINER:
+ app = context.getApplications().get(
+ containerId.getApplicationAttemptId().getApplicationId());
+ launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher,
+ exec, app, event.getContainer(), dirsHandler, containerManager);
+ containerLauncher.submit(launch);
+ running.put(containerId, launch);
+ break;
case CLEANUP_CONTAINER:
ContainerLaunch launcher = running.remove(containerId);
if (launcher == null) {
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java Wed Aug 20 01:34:29 2014
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no
public enum ContainersLauncherEventType {
LAUNCH_CONTAINER,
+ RECOVER_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself.
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Aug 20 01:34:29 2014
@@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
@@ -251,6 +252,7 @@ public class ResourceLocalizationService
cacheCleanupPeriod =
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
localizationServerAddress = conf.getSocketAddr(
+ YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
@@ -341,7 +343,9 @@ public class ResourceLocalizationService
server = createServer();
server.start();
localizationServerAddress =
- getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS,
+ getConfig().updateConnectAddr(YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_LOCALIZER_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
server.getListenerAddress());
LOG.info("Localizer started on port " + server.getPort());
super.serviceStart();
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java Wed Aug 20 01:34:29 2014
@@ -25,5 +25,7 @@ public interface AppLogAggregator extend
void startContainerLogAggregation(ContainerId containerId,
boolean wasContainerSuccessful);
+ void abortLogAggregation();
+
void finishLogAggregation();
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Wed Aug 20 01:34:29 2014
@@ -70,6 +70,7 @@ public class AppLogAggregatorImpl implem
private final BlockingQueue<ContainerId> pendingContainers;
private final AtomicBoolean appFinishing = new AtomicBoolean();
private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
+ private final AtomicBoolean aborted = new AtomicBoolean();
private final Map<ApplicationAccessType, String> appAcls;
private LogWriter writer = null;
@@ -150,7 +151,7 @@ public class AppLogAggregatorImpl implem
private void doAppLogAggregation() {
ContainerId containerId;
- while (!this.appFinishing.get()) {
+ while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
wait(THREAD_SLEEP_TIME);
@@ -161,6 +162,10 @@ public class AppLogAggregatorImpl implem
}
}
+ if (this.aborted.get()) {
+ return;
+ }
+
// Application is finished. Finish pending-containers
while ((containerId = this.pendingContainers.poll()) != null) {
uploadLogsForContainer(containerId);
@@ -255,4 +260,11 @@ public class AppLogAggregatorImpl implem
this.appFinishing.set(true);
this.notifyAll();
}
+
+ @Override
+ public synchronized void abortLogAggregation() {
+ LOG.info("Aborting log aggregation for " + this.applicationId);
+ this.aborted.set(true);
+ this.notifyAll();
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Wed Aug 20 01:34:29 2014
@@ -142,9 +142,17 @@ public class LogAggregationService exten
private void stopAggregators() {
threadPool.shutdown();
+ // if recovery on restart is supported then leave outstanding aggregations
+ // to the next restart
+ boolean shouldAbort = context.getNMStateStore().canRecover()
+ && !context.getDecommissioned();
// politely ask to finish
for (AppLogAggregator aggregator : appLogAggregators.values()) {
- aggregator.finishLogAggregation();
+ if (shouldAbort) {
+ aggregator.abortLogAggregation();
+ } else {
+ aggregator.finishLogAggregation();
+ }
}
while (!threadPool.isTerminated()) { // wait for all threads to finish
for (ApplicationId appId : appLogAggregators.keySet()) {
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Wed Aug 20 01:34:29 2014
@@ -35,19 +35,23 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
-import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory;
@@ -68,12 +72,17 @@ public class NMLeveldbStateStoreService
private static final String DB_NAME = "yarn-nm-state";
private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
- private static final NMDBSchemaVersion CURRENT_VERSION_INFO = NMDBSchemaVersion
+ private static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 0);
private static final String DELETION_TASK_KEY_PREFIX =
"DeletionService/deltask_";
+ private static final String APPLICATIONS_KEY_PREFIX =
+ "ContainerManager/applications/";
+ private static final String FINISHED_APPS_KEY_PREFIX =
+ "ContainerManager/finishedApps/";
+
private static final String LOCALIZATION_KEY_PREFIX = "Localization/";
private static final String LOCALIZATION_PUBLIC_KEY_PREFIX =
LOCALIZATION_KEY_PREFIX + "public/";
@@ -84,6 +93,14 @@ public class NMLeveldbStateStoreService
private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/";
private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/";
+ private static final String CONTAINERS_KEY_PREFIX =
+ "ContainerManager/containers/";
+ private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
+ private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
+ private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
+ private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
+ private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
+
private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/";
@@ -98,6 +115,8 @@ public class NMLeveldbStateStoreService
private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
+ private static final byte[] EMPTY_VALUE = new byte[0];
+
private DB db;
public NMLeveldbStateStoreService() {
@@ -117,6 +136,246 @@ public class NMLeveldbStateStoreService
@Override
+ public List<RecoveredContainerState> loadContainersState()
+ throws IOException {
+ ArrayList<RecoveredContainerState> containers =
+ new ArrayList<RecoveredContainerState>();
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(CONTAINERS_KEY_PREFIX));
+
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.peekNext();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(CONTAINERS_KEY_PREFIX)) {
+ break;
+ }
+
+ int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length());
+ if (idEndPos < 0) {
+ throw new IOException("Unable to determine container in key: " + key);
+ }
+ ContainerId containerId = ConverterUtils.toContainerId(
+ key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos));
+ String keyPrefix = key.substring(0, idEndPos+1);
+ containers.add(loadContainerState(containerId, iter, keyPrefix));
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+
+ return containers;
+ }
+
+ private RecoveredContainerState loadContainerState(ContainerId containerId,
+ LeveldbIterator iter, String keyPrefix) throws IOException {
+ RecoveredContainerState rcs = new RecoveredContainerState();
+ rcs.status = RecoveredContainerStatus.REQUESTED;
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.peekNext();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(keyPrefix)) {
+ break;
+ }
+ iter.next();
+
+ String suffix = key.substring(keyPrefix.length()-1); // start with '/'
+ if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) {
+ rcs.startRequest = new StartContainerRequestPBImpl(
+ StartContainerRequestProto.parseFrom(entry.getValue()));
+ } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
+ rcs.diagnostics = asString(entry.getValue());
+ } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+ if (rcs.status == RecoveredContainerStatus.REQUESTED) {
+ rcs.status = RecoveredContainerStatus.LAUNCHED;
+ }
+ } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
+ rcs.killed = true;
+ } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) {
+ rcs.status = RecoveredContainerStatus.COMPLETED;
+ rcs.exitCode = Integer.parseInt(asString(entry.getValue()));
+ } else {
+ throw new IOException("Unexpected container state key: " + key);
+ }
+ }
+ return rcs;
+ }
+
+ @Override
+ public void storeContainer(ContainerId containerId,
+ StartContainerRequest startRequest) throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_REQUEST_KEY_SUFFIX;
+ try {
+ db.put(bytes(key),
+ ((StartContainerRequestPBImpl) startRequest).getProto().toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void storeContainerDiagnostics(ContainerId containerId,
+ StringBuilder diagnostics) throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_DIAGS_KEY_SUFFIX;
+ try {
+ db.put(bytes(key), bytes(diagnostics.toString()));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void storeContainerLaunched(ContainerId containerId)
+ throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_LAUNCHED_KEY_SUFFIX;
+ try {
+ db.put(bytes(key), EMPTY_VALUE);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void storeContainerKilled(ContainerId containerId)
+ throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_KILLED_KEY_SUFFIX;
+ try {
+ db.put(bytes(key), EMPTY_VALUE);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void storeContainerCompleted(ContainerId containerId,
+ int exitCode) throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_EXIT_CODE_KEY_SUFFIX;
+ try {
+ db.put(bytes(key), bytes(Integer.toString(exitCode)));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void removeContainer(ContainerId containerId)
+ throws IOException {
+ String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString();
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX));
+ batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
+ batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
+ batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
+ batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ @Override
+ public RecoveredApplicationsState loadApplicationsState()
+ throws IOException {
+ RecoveredApplicationsState state = new RecoveredApplicationsState();
+ state.applications = new ArrayList<ContainerManagerApplicationProto>();
+ String keyPrefix = APPLICATIONS_KEY_PREFIX;
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(keyPrefix));
+ while (iter.hasNext()) {
+ Entry<byte[], byte[]> entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(keyPrefix)) {
+ break;
+ }
+ state.applications.add(
+ ContainerManagerApplicationProto.parseFrom(entry.getValue()));
+ }
+
+ state.finishedApplications = new ArrayList<ApplicationId>();
+ keyPrefix = FINISHED_APPS_KEY_PREFIX;
+ iter.seek(bytes(keyPrefix));
+ while (iter.hasNext()) {
+ Entry<byte[], byte[]> entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(keyPrefix)) {
+ break;
+ }
+ ApplicationId appId =
+ ConverterUtils.toApplicationId(key.substring(keyPrefix.length()));
+ state.finishedApplications.add(appId);
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+
+ return state;
+ }
+
+ @Override
+ public void storeApplication(ApplicationId appId,
+ ContainerManagerApplicationProto p) throws IOException {
+ String key = APPLICATIONS_KEY_PREFIX + appId;
+ try {
+ db.put(bytes(key), p.toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void storeFinishedApplication(ApplicationId appId)
+ throws IOException {
+ String key = FINISHED_APPS_KEY_PREFIX + appId;
+ try {
+ db.put(bytes(key), new byte[0]);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void removeApplication(ApplicationId appId)
+ throws IOException {
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ String key = APPLICATIONS_KEY_PREFIX + appId;
+ batch.delete(bytes(key));
+ key = FINISHED_APPS_KEY_PREFIX + appId;
+ batch.delete(bytes(key));
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ @Override
public RecoveredLocalizationState loadLocalizationState()
throws IOException {
RecoveredLocalizationState state = new RecoveredLocalizationState();
@@ -617,14 +876,14 @@ public class NMLeveldbStateStoreService
}
- NMDBSchemaVersion loadVersion() throws IOException {
+ Version loadVersion() throws IOException {
byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
// if version is not stored previously, treat it as 1.0.
if (data == null || data.length == 0) {
- return NMDBSchemaVersion.newInstance(1, 0);
+ return Version.newInstance(1, 0);
}
- NMDBSchemaVersion version =
- new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
+ Version version =
+ new VersionPBImpl(VersionProto.parseFrom(data));
return version;
}
@@ -634,14 +893,14 @@ public class NMLeveldbStateStoreService
// Only used for test
@VisibleForTesting
- void storeVersion(NMDBSchemaVersion state) throws IOException {
+ void storeVersion(Version state) throws IOException {
dbStoreVersion(state);
}
- private void dbStoreVersion(NMDBSchemaVersion state) throws IOException {
+ private void dbStoreVersion(Version state) throws IOException {
String key = DB_SCHEMA_VERSION_KEY;
byte[] data =
- ((NMDBSchemaVersionPBImpl) state).getProto().toByteArray();
+ ((VersionPBImpl) state).getProto().toByteArray();
try {
db.put(bytes(key), data);
} catch (DBException e) {
@@ -649,7 +908,7 @@ public class NMLeveldbStateStoreService
}
}
- NMDBSchemaVersion getCurrentVersion() {
+ Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
@@ -664,9 +923,9 @@ public class NMLeveldbStateStoreService
* upgrade NM state or remove incompatible old state.
*/
private void checkVersion() throws IOException {
- NMDBSchemaVersion loadedVersion = loadVersion();
+ Version loadedVersion = loadVersion();
LOG.info("Loaded NM state version info " + loadedVersion);
- if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+ if (loadedVersion.equals(getCurrentVersion())) {
return;
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Wed Aug 20 01:34:29 2014
@@ -19,13 +19,16 @@
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -43,6 +46,61 @@ public class NMNullStateStoreService ext
}
@Override
+ public RecoveredApplicationsState loadApplicationsState() throws IOException {
+ throw new UnsupportedOperationException(
+ "Recovery not supported by this state store");
+ }
+
+ @Override
+ public void storeApplication(ApplicationId appId,
+ ContainerManagerApplicationProto p) throws IOException {
+ }
+
+ @Override
+ public void storeFinishedApplication(ApplicationId appId) {
+ }
+
+ @Override
+ public void removeApplication(ApplicationId appId) throws IOException {
+ }
+
+ @Override
+ public List<RecoveredContainerState> loadContainersState()
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Recovery not supported by this state store");
+ }
+
+ @Override
+ public void storeContainer(ContainerId containerId,
+ StartContainerRequest startRequest) throws IOException {
+ }
+
+ @Override
+ public void storeContainerDiagnostics(ContainerId containerId,
+ StringBuilder diagnostics) throws IOException {
+ }
+
+ @Override
+ public void storeContainerLaunched(ContainerId containerId)
+ throws IOException {
+ }
+
+ @Override
+ public void storeContainerKilled(ContainerId containerId)
+ throws IOException {
+ }
+
+ @Override
+ public void storeContainerCompleted(ContainerId containerId, int exitCode)
+ throws IOException {
+ }
+
+ @Override
+ public void removeContainer(ContainerId containerId) throws IOException {
+ }
+
+ @Override
public RecoveredLocalizationState loadLocalizationState()
throws IOException {
throw new UnsupportedOperationException(
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Wed Aug 20 01:34:29 2014
@@ -29,10 +29,13 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -45,6 +48,53 @@ public abstract class NMStateStoreServic
super(name);
}
+ public static class RecoveredApplicationsState {
+ List<ContainerManagerApplicationProto> applications;
+ List<ApplicationId> finishedApplications;
+
+ public List<ContainerManagerApplicationProto> getApplications() {
+ return applications;
+ }
+
+ public List<ApplicationId> getFinishedApplications() {
+ return finishedApplications;
+ }
+ }
+
+ public enum RecoveredContainerStatus {
+ REQUESTED,
+ LAUNCHED,
+ COMPLETED
+ }
+
+ public static class RecoveredContainerState {
+ RecoveredContainerStatus status;
+ int exitCode = ContainerExitStatus.INVALID;
+ boolean killed = false;
+ String diagnostics = "";
+ StartContainerRequest startRequest;
+
+ public RecoveredContainerStatus getStatus() {
+ return status;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public boolean getKilled() {
+ return killed;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public StartContainerRequest getStartRequest() {
+ return startRequest;
+ }
+ }
+
public static class LocalResourceTrackerState {
List<LocalizedResourceProto> localizedResources =
new ArrayList<LocalizedResourceProto>();
@@ -163,6 +213,100 @@ public abstract class NMStateStoreServic
/**
+ * Load the state of applications
+ * @return recovered state for applications
+ * @throws IOException
+ */
+ public abstract RecoveredApplicationsState loadApplicationsState()
+ throws IOException;
+
+ /**
+ * Record the start of an application
+ * @param appId the application ID
+ * @param p state to store for the application
+ * @throws IOException
+ */
+ public abstract void storeApplication(ApplicationId appId,
+ ContainerManagerApplicationProto p) throws IOException;
+
+ /**
+ * Record that an application has finished
+ * @param appId the application ID
+ * @throws IOException
+ */
+ public abstract void storeFinishedApplication(ApplicationId appId)
+ throws IOException;
+
+ /**
+ * Remove records corresponding to an application
+ * @param appId the application ID
+ * @throws IOException
+ */
+ public abstract void removeApplication(ApplicationId appId)
+ throws IOException;
+
+
+ /**
+ * Load the state of containers
+ * @return recovered state for containers
+ * @throws IOException
+ */
+ public abstract List<RecoveredContainerState> loadContainersState()
+ throws IOException;
+
+ /**
+ * Record a container start request
+ * @param containerId the container ID
+ * @param startRequest the container start request
+ * @throws IOException
+ */
+ public abstract void storeContainer(ContainerId containerId,
+ StartContainerRequest startRequest) throws IOException;
+
+ /**
+ * Record that a container has been launched
+ * @param containerId the container ID
+ * @throws IOException
+ */
+ public abstract void storeContainerLaunched(ContainerId containerId)
+ throws IOException;
+
+ /**
+ * Record that a container has completed
+ * @param containerId the container ID
+ * @param exitCode the exit code from the container
+ * @throws IOException
+ */
+ public abstract void storeContainerCompleted(ContainerId containerId,
+ int exitCode) throws IOException;
+
+ /**
+ * Record a request to kill a container
+ * @param containerId the container ID
+ * @throws IOException
+ */
+ public abstract void storeContainerKilled(ContainerId containerId)
+ throws IOException;
+
+ /**
+ * Record diagnostics for a container
+ * @param containerId the container ID
+ * @param diagnostics the container diagnostics
+ * @throws IOException
+ */
+ public abstract void storeContainerDiagnostics(ContainerId containerId,
+ StringBuilder diagnostics) throws IOException;
+
+ /**
+ * Remove records corresponding to a container
+ * @param containerId the container ID
+ * @throws IOException
+ */
+ public abstract void removeContainer(ContainerId containerId)
+ throws IOException;
+
+
+ /**
* Load the state of localized resources
* @return recovered localized resource state
* @throws IOException
@@ -203,43 +347,111 @@ public abstract class NMStateStoreServic
ApplicationId appId, Path localPath) throws IOException;
+ /**
+ * Load the state of the deletion service
+ * @return recovered deletion service state
+ * @throws IOException
+ */
public abstract RecoveredDeletionServiceState loadDeletionServiceState()
throws IOException;
+ /**
+ * Record a deletion task
+ * @param taskId the deletion task ID
+ * @param taskProto the deletion task protobuf
+ * @throws IOException
+ */
public abstract void storeDeletionTask(int taskId,
DeletionServiceDeleteTaskProto taskProto) throws IOException;
+ /**
+ * Remove records corresponding to a deletion task
+ * @param taskId the deletion task ID
+ * @throws IOException
+ */
public abstract void removeDeletionTask(int taskId) throws IOException;
+ /**
+ * Load the state of NM tokens
+ * @return recovered state of NM tokens
+ * @throws IOException
+ */
public abstract RecoveredNMTokensState loadNMTokensState()
throws IOException;
+ /**
+ * Record the current NM token master key
+ * @param key the master key
+ * @throws IOException
+ */
public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
throws IOException;
+ /**
+ * Record the previous NM token master key
+ * @param key the previous master key
+ * @throws IOException
+ */
public abstract void storeNMTokenPreviousMasterKey(MasterKey key)
throws IOException;
+ /**
+ * Record a master key corresponding to an application
+ * @param attempt the application attempt ID
+ * @param key the master key
+ * @throws IOException
+ */
public abstract void storeNMTokenApplicationMasterKey(
ApplicationAttemptId attempt, MasterKey key) throws IOException;
+ /**
+ * Remove a master key corresponding to an application
+ * @param attempt the application attempt ID
+ * @throws IOException
+ */
public abstract void removeNMTokenApplicationMasterKey(
ApplicationAttemptId attempt) throws IOException;
+ /**
+ * Load the state of container tokens
+ * @return recovered state of container tokens
+ * @throws IOException
+ */
public abstract RecoveredContainerTokensState loadContainerTokensState()
throws IOException;
+ /**
+ * Record the current container token master key
+ * @param key the master key
+ * @throws IOException
+ */
public abstract void storeContainerTokenCurrentMasterKey(MasterKey key)
throws IOException;
+ /**
+ * Record the previous container token master key
+ * @param key the previous master key
+ * @throws IOException
+ */
public abstract void storeContainerTokenPreviousMasterKey(MasterKey key)
throws IOException;
+ /**
+ * Record the expiration time for a container token
+ * @param containerId the container ID
+ * @param expirationTime the container token expiration time
+ * @throws IOException
+ */
public abstract void storeContainerToken(ContainerId containerId,
Long expirationTime) throws IOException;
+ /**
+ * Remove records for a container token
+ * @param containerId the container ID
+ * @throws IOException
+ */
public abstract void removeContainerToken(ContainerId containerId)
throws IOException;
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java Wed Aug 20 01:34:29 2014
@@ -55,7 +55,9 @@ public class WebServer extends AbstractS
@Override
protected void serviceStart() throws Exception {
- String bindAddress = WebAppUtils.getNMWebAppURLWithoutScheme(getConfig());
+ String bindAddress = WebAppUtils.getWebAppBindURL(getConfig(),
+ YarnConfiguration.NM_BIND_HOST,
+ WebAppUtils.getNMWebAppURLWithoutScheme(getConfig()));
LOG.info("Instantiating NMWebApp at " + bindAddress);
try {