You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:19 UTC
svn commit: r1513258 [5/10] - in
/hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/
hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/
hadoop-...
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Mon Aug 12 21:25:49 2013
@@ -23,7 +23,10 @@ import static org.apache.hadoop.service.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +40,7 @@ import org.apache.hadoop.io.DataInputByt
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -47,23 +51,25 @@ import org.apache.hadoop.service.Service
import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
+import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -79,6 +85,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -125,8 +132,6 @@ public class ContainerManagerImpl extend
private final NodeStatusUpdater nodeStatusUpdater;
- private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
protected LocalDirsHandlerService dirsHandler;
protected final AsyncDispatcher dispatcher;
private final ApplicationACLsManager aclsManager;
@@ -166,7 +171,6 @@ public class ContainerManagerImpl extend
new ContainersMonitorImpl(exec, dispatcher, this.context);
addService(this.containersMonitor);
-
dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class,
@@ -228,6 +232,13 @@ public class ContainerManagerImpl extend
// Enqueue user dirs in deletion context
Configuration conf = getConfig();
+ Configuration serverConf = new Configuration(conf);
+
+ // always enforce it to be token-based.
+ serverConf.set(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ SaslRpcServer.AuthMethod.TOKEN.toString());
+
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress initialAddress = conf.getSocketAddr(
@@ -236,8 +247,8 @@ public class ContainerManagerImpl extend
YarnConfiguration.DEFAULT_NM_PORT);
server =
- rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, conf,
- this.context.getNMTokenSecretManager(),
+ rpc.getServer(ContainerManagementProtocol.class, this, initialAddress,
+ serverConf, this.context.getNMTokenSecretManager(),
conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
@@ -247,7 +258,7 @@ public class ContainerManagerImpl extend
false)) {
refreshServiceAcls(conf, new NMPolicyProvider());
}
-
+
LOG.info("Blocking new container-requests as container manager rpc" +
" server is still starting.");
this.setBlockNewContainerRequests(true);
@@ -312,18 +323,25 @@ public class ContainerManagerImpl extend
return resultId;
}
+ protected void authorizeUser(UserGroupInformation remoteUgi,
+ NMTokenIdentifier nmTokenIdentifier) throws YarnException {
+ if (!remoteUgi.getUserName().equals(
+ nmTokenIdentifier.getApplicationAttemptId().toString())) {
+ throw RPCUtil.getRemoteException("Expected applicationAttemptId: "
+ + remoteUgi.getUserName() + "Found: "
+ + nmTokenIdentifier.getApplicationAttemptId());
+ }
+ }
+
/**
* @param containerTokenIdentifier
* of the container to be started
- * @param ugi
- * ugi corresponding to the remote end making the api-call
* @throws YarnException
*/
@Private
@VisibleForTesting
protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
- ContainerTokenIdentifier containerTokenIdentifier,
- UserGroupInformation ugi) throws YarnException {
+ ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
ContainerId containerId = containerTokenIdentifier.getContainerID();
String containerIDStr = containerId.toString();
@@ -338,14 +356,8 @@ public class ContainerManagerImpl extend
.append(" was used for starting container with container token")
.append(" issued for application attempt : ")
.append(containerId.getApplicationAttemptId());
- } else if (!ugi.getUserName().equals(
- nmTokenIdentifier.getApplicationAttemptId().toString())) {
- unauthorized = true;
- messageBuilder.append("\nExpected applicationAttemptId: ")
- .append(ugi.getUserName()).append(" Found: ")
- .append(nmTokenIdentifier.getApplicationAttemptId().toString());
} else if (!this.context.getContainerTokenSecretManager()
- .isValidStartContainerRequest(containerId)) {
+ .isValidStartContainerRequest(containerTokenIdentifier)) {
// Is the container being relaunched? Or RPC layer let startCall with
// tokens generated off old-secret through?
unauthorized = true;
@@ -359,7 +371,6 @@ public class ContainerManagerImpl extend
.append(System.currentTimeMillis()).append(" found ")
.append(containerTokenIdentifier.getExpiryTimeStamp());
}
-
if (unauthorized) {
String msg = messageBuilder.toString();
LOG.error(msg);
@@ -368,18 +379,53 @@ public class ContainerManagerImpl extend
}
/**
- * Start a container on this NodeManager.
+ * Start a list of containers on this NodeManager.
*/
- @SuppressWarnings("unchecked")
@Override
- public StartContainerResponse startContainer(StartContainerRequest request)
- throws YarnException, IOException {
-
+ public StartContainersResponse
+ startContainers(StartContainersRequest requests) throws YarnException,
+ IOException {
if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException(
"Rejecting new containers as NodeManager has not"
+ " yet connected with ResourceManager");
}
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
+ authorizeUser(remoteUgi,nmTokenIdentifier);
+ List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
+ Map<ContainerId, SerializedException> failedContainers =
+ new HashMap<ContainerId, SerializedException>();
+ for (StartContainerRequest request : requests.getStartContainerRequests()) {
+ ContainerId containerId = null;
+ try {
+ ContainerTokenIdentifier containerTokenIdentifier =
+ BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
+ verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
+ containerTokenIdentifier);
+ containerId = containerTokenIdentifier.getContainerID();
+ startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
+ request);
+ succeededContainers.add(containerId);
+ } catch (YarnException e) {
+ failedContainers.put(containerId, SerializedException.newInstance(e));
+ } catch (InvalidToken ie) {
+ failedContainers.put(containerId, SerializedException.newInstance(ie));
+ throw ie;
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e);
+ }
+ }
+
+ return StartContainersResponse.newInstance(auxiliaryServices.getMetaData(),
+ succeededContainers, failedContainers);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ StartContainerRequest request) throws YarnException, IOException {
+
/*
* 1) It should save the NMToken into NMTokenSecretManager. This is done
* here instead of RPC layer because at the time of opening/authenticating
@@ -391,18 +437,8 @@ public class ContainerManagerImpl extend
* belongs to correct Node Manager (part of retrieve password). c) It has
* correct RMIdentifier. d) It is not expired.
*/
- // update NMToken
-
- UserGroupInformation remoteUgi = getRemoteUgi();
- NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
-
- // Validate containerToken
- ContainerTokenIdentifier containerTokenIdentifier =
- verifyAndGetContainerTokenIdentifier(request.getContainerToken());
-
- authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier,
- remoteUgi);
-
+ authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier);
+
if (containerTokenIdentifier.getRMIdentifer() != nodeStatusUpdater
.getRMIdentifier()) {
// Is the container coming from unknown RM
@@ -411,9 +447,9 @@ public class ContainerManagerImpl extend
.append(" rejected as it is allocated by a previous RM");
throw new InvalidContainerException(sb.toString());
}
-
+ // update NMToken
updateNMTokenIdentifier(nmTokenIdentifier);
-
+
ContainerId containerId = containerTokenIdentifier.getContainerID();
String containerIdStr = containerId.toString();
String user = containerTokenIdentifier.getApplicationSubmitter();
@@ -457,26 +493,16 @@ public class ContainerManagerImpl extend
containerTokenIdentifier);
NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
"ContainerManageImpl", applicationID, containerId);
- StartContainerResponse response =
- recordFactory.newRecordInstance(StartContainerResponse.class);
- response.setAllServicesMetaData(auxiliaryServices.getMetaData());
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
// launch. A finished Application will not launch containers.
metrics.launchedContainer();
- metrics.allocateContainer(containerTokenIdentifier.getResource());
- return response;
+ metrics.allocateContainer(containerTokenIdentifier.getResource());
}
protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
- org.apache.hadoop.yarn.api.records.Token token) throws YarnException,
+ org.apache.hadoop.yarn.api.records.Token token,
+ ContainerTokenIdentifier containerTokenIdentifier) throws YarnException,
InvalidToken {
- ContainerTokenIdentifier containerTokenIdentifier = null;
- try {
- containerTokenIdentifier =
- BuilderUtils.newContainerTokenIdentifier(token);
- } catch (IOException e) {
- throw RPCUtil.getRemoteException(e);
- }
byte[] password =
context.getContainerTokenSecretManager().retrievePassword(
containerTokenIdentifier);
@@ -524,64 +550,110 @@ public class ContainerManagerImpl extend
}
/**
- * Stop the container running on this NodeManager.
+ * Stop a list of containers running on this NodeManager.
*/
@Override
- @SuppressWarnings("unchecked")
- public StopContainerResponse stopContainer(StopContainerRequest request)
+ public StopContainersResponse stopContainers(StopContainersRequest requests)
throws YarnException, IOException {
- ContainerId containerID = request.getContainerId();
+ List<ContainerId> succeededRequests = new ArrayList<ContainerId>();
+ Map<ContainerId, SerializedException> failedRequests =
+ new HashMap<ContainerId, SerializedException>();
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
+ for (ContainerId id : requests.getContainerIds()) {
+ try {
+ stopContainerInternal(identifier, id);
+ succeededRequests.add(id);
+ } catch (YarnException e) {
+ failedRequests.put(id, SerializedException.newInstance(e));
+ }
+ }
+ return StopContainersResponse
+ .newInstance(succeededRequests, failedRequests);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+ ContainerId containerID) throws YarnException {
String containerIDStr = containerID.toString();
Container container = this.context.getContainers().get(containerID);
- LOG.info("Getting container-status for " + containerIDStr);
- authorizeGetAndStopContainerRequest(containerID, container, true);
-
- StopContainerResponse response =
- recordFactory.newRecordInstance(StopContainerResponse.class);
-
- dispatcher.getEventHandler().handle(
- new ContainerKillEvent(containerID,
- "Container killed by the ApplicationMaster."));
+ LOG.info("Stopping container with container Id: " + containerIDStr);
+ authorizeGetAndStopContainerRequest(containerID, container, true,
+ nmTokenIdentifier);
- NMAuditLogger.logSuccess(container.getUser(),
- AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
- .getApplicationAttemptId().getApplicationId(), containerID);
-
- // TODO: Move this code to appropriate place once kill_container is
- // implemented.
- nodeStatusUpdater.sendOutofBandHeartBeat();
+ if (container == null) {
+ if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
+ throw RPCUtil.getRemoteException("Container " + containerIDStr
+ + " is not handled by this NodeManager");
+ }
+ } else {
+ dispatcher.getEventHandler().handle(
+ new ContainerKillEvent(containerID,
+ "Container killed by the ApplicationMaster."));
- return response;
+ NMAuditLogger.logSuccess(container.getUser(),
+ AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
+ .getApplicationAttemptId().getApplicationId(), containerID);
+
+ // TODO: Move this code to appropriate place once kill_container is
+ // implemented.
+ nodeStatusUpdater.sendOutofBandHeartBeat();
+ }
}
+ /**
+ * Get a list of container statuses running on this NodeManager
+ */
@Override
- public GetContainerStatusResponse getContainerStatus(
- GetContainerStatusRequest request) throws YarnException, IOException {
+ public GetContainerStatusesResponse getContainerStatuses(
+ GetContainerStatusesRequest request) throws YarnException, IOException {
- ContainerId containerID = request.getContainerId();
+ List<ContainerStatus> succeededRequests = new ArrayList<ContainerStatus>();
+ Map<ContainerId, SerializedException> failedRequests =
+ new HashMap<ContainerId, SerializedException>();
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
+ for (ContainerId id : request.getContainerIds()) {
+ try {
+ ContainerStatus status = getContainerStatusInternal(id, identifier);
+ succeededRequests.add(status);
+ } catch (YarnException e) {
+ failedRequests.put(id, SerializedException.newInstance(e));
+ }
+ }
+ return GetContainerStatusesResponse.newInstance(succeededRequests,
+ failedRequests);
+ }
+
+ private ContainerStatus getContainerStatusInternal(ContainerId containerID,
+ NMTokenIdentifier nmTokenIdentifier) throws YarnException {
String containerIDStr = containerID.toString();
Container container = this.context.getContainers().get(containerID);
LOG.info("Getting container-status for " + containerIDStr);
- authorizeGetAndStopContainerRequest(containerID, container, false);
+ authorizeGetAndStopContainerRequest(containerID, container, false,
+ nmTokenIdentifier);
+ if (container == null) {
+ if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
+ throw RPCUtil.getRemoteException("Container " + containerIDStr
+ + " was recently stopped on node manager.");
+ } else {
+ throw RPCUtil.getRemoteException("Container " + containerIDStr
+ + " is not handled by this NodeManager");
+ }
+ }
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
LOG.info("Returning " + containerStatus);
- GetContainerStatusResponse response =
- recordFactory.newRecordInstance(GetContainerStatusResponse.class);
- response.setStatus(containerStatus);
- return response;
+ return containerStatus;
}
@Private
@VisibleForTesting
protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
- Container container, boolean stopRequest) throws YarnException {
-
- UserGroupInformation remoteUgi = getRemoteUgi();
- NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
-
+ Container container, boolean stopRequest, NMTokenIdentifier identifier)
+ throws YarnException {
/*
* For get/stop container status; we need to verify that 1) User (NMToken)
* application attempt only has started container. 2) Requested containerId
@@ -603,17 +675,11 @@ public class ContainerManagerImpl extend
container.getContainerId());
} else {
LOG.warn(identifier.getApplicationAttemptId()
- + " attempted to get get status for non-application container : "
+ + " attempted to get status for non-application container : "
+ container.getContainerId().toString());
}
- throw RPCUtil.getRemoteException("Container " + containerId.toString()
- + " is not started by this application attempt.");
}
- if (container == null) {
- throw RPCUtil.getRemoteException("Container " + containerId.toString()
- + " is not handled by this NodeManager");
- }
}
class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Mon Aug 12 21:25:49 2013
@@ -392,9 +392,6 @@ public class ApplicationImpl implements
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
- // Inform the ContainerTokenSecretManager
- app.context.getContainerTokenSecretManager().appFinished(app.appId);
-
// Inform the logService
app.dispatcher.getEventHandler().handle(
new LogHandlerAppFinishedEvent(app.appId));
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Mon Aug 12 21:25:49 2013
@@ -103,7 +103,7 @@ public class ContainerImpl implements Co
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
- ContainerTokenIdentifier containerTokenIdentifier) throws IOException {
+ ContainerTokenIdentifier containerTokenIdentifier) {
this.daemonConf = conf;
this.dispatcher = dispatcher;
this.launchContext = launchContext;
@@ -290,6 +290,11 @@ public class ContainerImpl implements Co
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
+ // This transition may result when
+ // we notify container of failed localization if localizer thread (for
+ // that container) fails for some reason
+ .addTransition(ContainerState.DONE, ContainerState.DONE,
+ ContainerEventType.RESOURCE_FAILED)
// create the topology tables
.installTopology();
@@ -330,8 +335,11 @@ public class ContainerImpl implements Co
public Map<Path,List<String>> getLocalizedResources() {
this.readLock.lock();
try {
- assert ContainerState.LOCALIZED == getContainerState(); // TODO: FIXME!!
- return localizedResources;
+ if (ContainerState.LOCALIZED == getContainerState()) {
+ return localizedResources;
+ } else {
+ return null;
+ }
} finally {
this.readLock.unlock();
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Mon Aug 12 21:25:49 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
@@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
@@ -119,14 +121,20 @@ public class ContainerLaunch implements
@SuppressWarnings("unchecked") // dispatcher not typed
public Integer call() {
final ContainerLaunchContext launchContext = container.getLaunchContext();
- final Map<Path,List<String>> localResources =
- container.getLocalizedResources();
+ Map<Path,List<String>> localResources = null;
ContainerId containerID = container.getContainerId();
String containerIdStr = ConverterUtils.toString(containerID);
final List<String> command = launchContext.getCommands();
int ret = -1;
try {
+ localResources = container.getLocalizedResources();
+ if (localResources == null) {
+ RPCUtil.getRemoteException(
+ "Unable to get local resources when Container " + containerID +
+ " is at " + container.getContainerState());
+ }
+
final String user = container.getUser();
// /////////////////////////// Variable expansion
// Before the container script gets written out.
@@ -308,6 +316,7 @@ public class ContainerLaunch implements
* the process id is available.
* @throws IOException
*/
+ @SuppressWarnings("unchecked") // dispatcher not typed
public void cleanupContainer() throws IOException {
ContainerId containerId = container.getContainerId();
String containerIdStr = ConverterUtils.toString(containerId);
@@ -355,13 +364,17 @@ public class ContainerLaunch implements
+ " as user " + user
+ " for container " + containerIdStr
+ ", result=" + (result? "success" : "failed"));
- new DelayedProcessKiller(user,
+ new DelayedProcessKiller(container, user,
processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
}
}
} catch (Exception e) {
- LOG.warn("Got error when trying to cleanup container " + containerIdStr
- + ", error=" + e.getMessage());
+ String message =
+ "Exception when trying to cleanup container " + containerIdStr
+ + ": " + StringUtils.stringifyException(e);
+ LOG.warn(message);
+ dispatcher.getEventHandler().handle(
+ new ContainerDiagnosticsUpdateEvent(containerId, message));
} finally {
// cleanup pid file if present
if (pidFilePath != null) {
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Mon Aug 12 21:25:49 2013
@@ -37,12 +37,17 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -61,7 +66,8 @@ public class ContainersLauncher extends
private final Dispatcher dispatcher;
private LocalDirsHandlerService dirsHandler;
- private final ExecutorService containerLauncher =
+ @VisibleForTesting
+ public ExecutorService containerLauncher =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("ContainersLauncher #%d")
@@ -107,6 +113,7 @@ public class ContainersLauncher extends
super.serviceStop();
}
+ @SuppressWarnings("unchecked")
@Override
public void handle(ContainersLauncherEvent event) {
// TODO: ContainersLauncher launches containers one by one!!
@@ -134,9 +141,18 @@ public class ContainersLauncher extends
Future<Integer> rContainer = rContainerDatum.runningcontainer;
if (rContainer != null
&& !rContainer.isDone()) {
- // Cancel the future so that it won't be launched
- // if it isn't already.
- rContainer.cancel(false);
+ // Cancel the future so that it won't be launched if it isn't already.
+ // If it is going to be canceled, make sure CONTAINER_KILLED_ON_REQUEST
+ // will not be missed if the container is already at KILLING
+ if (rContainer.cancel(false)) {
+ if (container.getContainerState() == ContainerState.KILLING) {
+ dispatcher.getEventHandler().handle(
+ new ContainerExitEvent(containerId,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ ExitCode.TERMINATED.getExitCode(),
+ "Container terminated before launch."));
+ }
+ }
}
// Cleanup a container whether it is running/killed/completed, so that
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Mon Aug 12 21:25:49 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -288,11 +289,10 @@ public class ContainerLocalizer {
stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
} catch (ExecutionException e) {
stat.setStatus(ResourceStatusType.FETCH_FAILURE);
- stat.setException(
- YarnServerBuilderUtils.newSerializedException(e.getCause()));
+ stat.setException(SerializedException.newInstance(e.getCause()));
} catch (CancellationException e) {
stat.setStatus(ResourceStatusType.FETCH_FAILURE);
- stat.setException(YarnServerBuilderUtils.newSerializedException(e));
+ stat.setException(SerializedException.newInstance(e));
}
// TODO shouldn't remove until ACK
i.remove();
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Mon Aug 12 21:25:49 2013
@@ -242,6 +242,7 @@ class LocalResourcesTrackerImpl implemen
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
+ LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
return true;
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Mon Aug 12 21:25:49 2013
@@ -112,12 +112,17 @@ public class LocalizedResource implement
.append(getState() == ResourceState.LOCALIZED
? getLocalPath() + "," + getSize()
: "pending").append(",[");
- for (ContainerId c : ref) {
- sb.append("(").append(c.toString()).append(")");
+ try {
+ this.readLock.lock();
+ for (ContainerId c : ref) {
+ sb.append("(").append(c.toString()).append(")");
+ }
+ sb.append("],").append(getTimestamp()).append(",").append(getState())
+ .append("}");
+ return sb.toString();
+ } finally {
+ this.readLock.unlock();
}
- sb.append("],").append(getTimestamp()).append(",")
- .append(getState()).append("}");
- return sb.toString();
}
private void release(ContainerId container) {
@@ -188,8 +193,8 @@ public class LocalizedResource implement
LOG.warn("Can't handle this event at current state", e);
}
if (oldState != newState) {
- LOG.info("Resource " + resourcePath + " transitioned from "
- + oldState
+ LOG.info("Resource " + resourcePath + (localPath != null ?
+ "(->" + localPath + ")": "") + " transitioned from " + oldState
+ " to " + newState);
}
} finally {
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Mon Aug 12 21:25:49 2013
@@ -29,6 +29,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
@@ -629,23 +631,16 @@ public class ResourceLocalizationService
final Configuration conf;
final ExecutorService threadPool;
final CompletionService<Path> queue;
+ // Its shared between public localizer and dispatcher thread.
final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
PublicLocalizer(Configuration conf) {
- this(conf, getLocalFileContext(conf),
- createLocalizerExecutor(conf),
- new HashMap<Future<Path>,LocalizerResourceRequestEvent>());
- }
-
- PublicLocalizer(Configuration conf, FileContext lfs,
- ExecutorService threadPool,
- Map<Future<Path>,LocalizerResourceRequestEvent> pending) {
super("Public Localizer");
- this.lfs = lfs;
+ this.lfs = getLocalFileContext(conf);
this.conf = conf;
- this.pending = pending;
-
- this.threadPool = threadPool;
+ this.pending =
+ new ConcurrentHashMap<Future<Path>, LocalizerResourceRequestEvent>();
+ this.threadPool = createLocalizerExecutor(conf);
this.queue = new ExecutorCompletionService<Path>(threadPool);
}
@@ -747,6 +742,7 @@ public class ResourceLocalizationService
final LocalizerContext context;
final String localizerId;
final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
+ // Its a shared list between Private Localizer and dispatcher thread.
final List<LocalizerResourceRequestEvent> pending;
// TODO: threadsafe, use outer?
@@ -757,13 +753,14 @@ public class ResourceLocalizationService
super("LocalizerRunner for " + localizerId);
this.context = context;
this.localizerId = localizerId;
- this.pending = new ArrayList<LocalizerResourceRequestEvent>();
+ this.pending =
+ Collections
+ .synchronizedList(new ArrayList<LocalizerResourceRequestEvent>());
this.scheduled =
new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>();
}
public void addResource(LocalizerResourceRequestEvent request) {
- // TDOO: Synchronization
pending.add(request);
}
@@ -773,43 +770,44 @@ public class ResourceLocalizationService
* @return
*/
private LocalResource findNextResource() {
- // TODO: Synchronization
- for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
- i.hasNext();) {
- LocalizerResourceRequestEvent evt = i.next();
- LocalizedResource nRsrc = evt.getResource();
- // Resource download should take place ONLY if resource is in
- // Downloading state
- if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
- i.remove();
- continue;
- }
- /*
- * Multiple containers will try to download the same resource. So the
- * resource download should start only if
- * 1) We can acquire a non blocking semaphore lock on resource
- * 2) Resource is still in DOWNLOADING state
- */
- if (nRsrc.tryAcquire()) {
- if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
- LocalResourceRequest nextRsrc = nRsrc.getRequest();
- LocalResource next =
- recordFactory.newRecordInstance(LocalResource.class);
- next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
- .getPath()));
- next.setTimestamp(nextRsrc.getTimestamp());
- next.setType(nextRsrc.getType());
- next.setVisibility(evt.getVisibility());
- next.setPattern(evt.getPattern());
- scheduled.put(nextRsrc, evt);
- return next;
- } else {
- // Need to release acquired lock
- nRsrc.unlock();
- }
- }
+ synchronized (pending) {
+ for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
+ i.hasNext();) {
+ LocalizerResourceRequestEvent evt = i.next();
+ LocalizedResource nRsrc = evt.getResource();
+ // Resource download should take place ONLY if resource is in
+ // Downloading state
+ if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
+ i.remove();
+ continue;
+ }
+ /*
+ * Multiple containers will try to download the same resource. So the
+ * resource download should start only if
+ * 1) We can acquire a non blocking semaphore lock on resource
+ * 2) Resource is still in DOWNLOADING state
+ */
+ if (nRsrc.tryAcquire()) {
+ if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
+ LocalResourceRequest nextRsrc = nRsrc.getRequest();
+ LocalResource next =
+ recordFactory.newRecordInstance(LocalResource.class);
+ next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
+ .getPath()));
+ next.setTimestamp(nextRsrc.getTimestamp());
+ next.setType(nextRsrc.getType());
+ next.setVisibility(evt.getVisibility());
+ next.setPattern(evt.getPattern());
+ scheduled.put(nextRsrc, evt);
+ return next;
+ } else {
+ // Need to release acquired lock
+ nRsrc.unlock();
+ }
+ }
+ }
+ return null;
}
- return null;
}
LocalizerHeartbeatResponse update(
@@ -1094,7 +1092,8 @@ public class ResourceLocalizationService
try {
if (status.getPath().getName().matches(".*" +
ContainerLocalizer.USERCACHE + "_DEL_.*")) {
- cleanUpFilesFromSubDir(lfs, del, status.getPath());
+ LOG.info("usercache path : " + status.getPath().toString());
+ cleanUpFilesPerUserDir(lfs, del, status.getPath());
} else if (status.getPath().getName()
.matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
||
@@ -1111,17 +1110,28 @@ public class ResourceLocalizationService
}
}
- private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del,
- Path dirPath) throws IOException {
- RemoteIterator<FileStatus> fileStatus = lfs.listStatus(dirPath);
- if (fileStatus != null) {
- while (fileStatus.hasNext()) {
- FileStatus status = fileStatus.next();
+ private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
+ Path userDirPath) throws IOException {
+ RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
+ FileDeletionTask dependentDeletionTask =
+ del.createFileDeletionTask(null, userDirPath, new Path[] {});
+ if (userDirStatus != null) {
+ List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
+ while (userDirStatus.hasNext()) {
+ FileStatus status = userDirStatus.next();
String owner = status.getOwner();
- del.delete(owner, status.getPath(), new Path[] {});
+ FileDeletionTask deletionTask =
+ del.createFileDeletionTask(owner, null,
+ new Path[] { status.getPath() });
+ deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+ deletionTasks.add(deletionTask);
+ }
+ for (FileDeletionTask task : deletionTasks) {
+ del.scheduleFileDeletionTask(task);
}
+ } else {
+ del.scheduleFileDeletionTask(dependentDeletionTask);
}
- del.delete(null, dirPath, new Path[] {});
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java Mon Aug 12 21:25:49 2013
@@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.nodemanager.security;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -48,14 +48,15 @@ public class NMContainerTokenSecretManag
.getLog(NMContainerTokenSecretManager.class);
private MasterKeyData previousMasterKey;
+ private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker;
+
- private final Map<ApplicationId, ConcurrentMap<ContainerId, MasterKeyData>> oldMasterKeys;
private String nodeHostAddr;
public NMContainerTokenSecretManager(Configuration conf) {
super(conf);
- this.oldMasterKeys =
- new HashMap<ApplicationId, ConcurrentMap<ContainerId, MasterKeyData>>();
+ recentlyStartedContainerTracker =
+ new TreeMap<Long, List<ContainerId>>();
}
/**
@@ -93,9 +94,6 @@ public class NMContainerTokenSecretManag
public synchronized byte[] retrievePassword(
ContainerTokenIdentifier identifier) throws SecretManager.InvalidToken {
int keyId = identifier.getMasterKeyId();
- ContainerId containerId = identifier.getContainerID();
- ApplicationId appId =
- containerId.getApplicationAttemptId().getApplicationId();
MasterKeyData masterKeyToUse = null;
if (this.previousMasterKey != null
@@ -107,19 +105,6 @@ public class NMContainerTokenSecretManag
// A container-launch has come in with a token generated off the current
// master-key
masterKeyToUse = super.currentMasterKey;
- } else if (this.oldMasterKeys.containsKey(appId)
- && this.oldMasterKeys.get(appId).containsKey(containerId)) {
- // This means on the following happened:
- // (1) a stopContainer() or a getStatus() happened for a container with
- // token generated off a master-key that is neither current nor the
- // previous one.
- // (2) a container-relaunch has come in with a token generated off a
- // master-key that is neither current nor the previous one.
- // This basically lets stop and getStatus() calls with old-tokens to pass
- // through without any issue, i.e. (1).
- // Start-calls for repetitive launches (2) also pass through RPC here, but
- // get thwarted at the app-layer as part of startContainer() call.
- masterKeyToUse = this.oldMasterKeys.get(appId).get(containerId);
}
if (nodeHostAddr != null
@@ -143,61 +128,64 @@ public class NMContainerTokenSecretManag
}
/**
- * Container start has gone through. Store the corresponding keys so that
- * stopContainer() and getContainerStatus() can be authenticated long after
- * the container-start went through.
+ * Container start has gone through. We need to store the containerId in order
+ * to block future container start requests with same container token. This
+ * container token needs to be saved till its container token expires.
*/
public synchronized void startContainerSuccessful(
ContainerTokenIdentifier tokenId) {
- int keyId = tokenId.getMasterKeyId();
- if (currentMasterKey.getMasterKey().getKeyId() == keyId) {
- addKeyForContainerId(tokenId.getContainerID(), currentMasterKey);
- } else if (previousMasterKey != null
- && previousMasterKey.getMasterKey().getKeyId() == keyId) {
- addKeyForContainerId(tokenId.getContainerID(), previousMasterKey);
+
+ removeAnyContainerTokenIfExpired();
+
+ Long expTime = tokenId.getExpiryTimeStamp();
+ // We might have multiple containers with same expiration time.
+ if (!recentlyStartedContainerTracker.containsKey(expTime)) {
+ recentlyStartedContainerTracker
+ .put(expTime, new ArrayList<ContainerId>());
+ }
+ recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID());
+
+ }
+
+ protected synchronized void removeAnyContainerTokenIfExpired() {
+ // Trying to remove any container if its container token has expired.
+ Iterator<Entry<Long, List<ContainerId>>> containersI =
+ this.recentlyStartedContainerTracker.entrySet().iterator();
+ Long currTime = System.currentTimeMillis();
+ while (containersI.hasNext()) {
+ Entry<Long, List<ContainerId>> containerEntry = containersI.next();
+ if (containerEntry.getKey() < currTime) {
+ containersI.remove();
+ } else {
+ break;
+ }
}
}
/**
- * Ensure the startContainer call is not using an older cached key. Will
- * return false once startContainerSuccessful is called. Does not check
- * the actual key being current since that is verified by the security layer
- * via retrievePassword.
+ * Container will be remembered based on expiration time of the container
+ * token used for starting the container. It is safe to use expiration time
+ * as there is one to many mapping between expiration time and containerId.
+ * @return true if the current token identifier is not present in cache.
*/
public synchronized boolean isValidStartContainerRequest(
- ContainerId containerID) {
- ApplicationId applicationId =
- containerID.getApplicationAttemptId().getApplicationId();
- return !this.oldMasterKeys.containsKey(applicationId)
- || !this.oldMasterKeys.get(applicationId).containsKey(containerID);
- }
+ ContainerTokenIdentifier containerTokenIdentifier) {
- private synchronized void addKeyForContainerId(ContainerId containerId,
- MasterKeyData masterKeyData) {
- if (containerId != null) {
- ApplicationId appId =
- containerId.getApplicationAttemptId().getApplicationId();
- if (!this.oldMasterKeys.containsKey(appId)) {
- this.oldMasterKeys.put(appId,
- new ConcurrentHashMap<ContainerId, MasterKeyData>());
- }
- ConcurrentMap<ContainerId, MasterKeyData> containerIdToKeysMapForThisApp =
- this.oldMasterKeys.get(appId);
- containerIdToKeysMapForThisApp.put(containerId, masterKeyData);
+ removeAnyContainerTokenIfExpired();
+
+ Long expTime = containerTokenIdentifier.getExpiryTimeStamp();
+ List<ContainerId> containers =
+ this.recentlyStartedContainerTracker.get(expTime);
+ if (containers == null
+ || !containers.contains(containerTokenIdentifier.getContainerID())) {
+ return true;
} else {
- LOG.warn("Not adding key for null containerId");
+ return false;
}
}
- // Holding on to master-keys corresponding to containers until the app is
- // finished due to the multiple ways a container can finish. Avoid
- // stopContainer calls seeing unnecessary authorization exceptions.
- public synchronized void appFinished(ApplicationId appId) {
- this.oldMasterKeys.remove(appId);
- }
-
public synchronized void setNodeId(NodeId nodeId) {
nodeHostAddr = nodeId.toString();
LOG.info("Updating node address : " + nodeHostAddr);
- }
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Mon Aug 12 21:25:49 2013
@@ -195,14 +195,13 @@ public class DummyContainerManager exten
@Override
protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
- ContainerTokenIdentifier containerTokenIdentifier,
- UserGroupInformation ugi) throws YarnException {
+ ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
// do nothing
}
@Override
protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
- Container container, boolean stopRequest) throws YarnException {
+ Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
// do nothing
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java Mon Aug 12 21:25:49 2013
@@ -61,6 +61,10 @@ public class MockNodeStatusUpdater exten
protected ResourceTracker getRMClient() {
return resourceTracker;
}
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
private static class MockResourceTracker implements ResourceTracker {
private int heartBeatID;
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java Mon Aug 12 21:25:49 2013
@@ -146,7 +146,7 @@ public class TestContainerManagerWithLCE
@Override
public void testContainerLaunchFromPreviousRM() throws InterruptedException,
- IOException {
+ IOException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
@@ -155,6 +155,29 @@ public class TestContainerManagerWithLCE
LOG.info("Running testContainerLaunchFromPreviousRM");
super.testContainerLaunchFromPreviousRM();
}
+
+ @Override
+ public void testMultipleContainersLaunch() throws Exception {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testContainerLaunchFromPreviousRM");
+ super.testMultipleContainersLaunch();
+ }
+
+ @Override
+ public void testMultipleContainersStopAndGetStatus() throws Exception {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testContainerLaunchFromPreviousRM");
+ super.testMultipleContainersStopAndGetStatus();
+ }
+
private boolean shouldRunTest() {
return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java Mon Aug 12 21:25:49 2013
@@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -28,16 +33,11 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-
-
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.Mockito;
-import static org.junit.Assert.*;
-
public class TestDeletionService {
private static final FileContext lfs = getLfs();
@@ -210,4 +210,79 @@ public class TestDeletionService {
}
assertTrue(del.isTerminated());
}
+
+ @Test (timeout=60000)
+ public void testFileDeletionTaskDependency() throws Exception {
+ FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor();
+ Configuration conf = new Configuration();
+ exec.setConf(conf);
+ DeletionService del = new DeletionService(exec);
+ del.init(conf);
+ del.start();
+
+ try {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ List<Path> dirs = buildDirs(r, base, 2);
+ createDirs(new Path("."), dirs);
+
+ // first we will try to delete sub directories which are present. This
+ // should then trigger parent directory to be deleted.
+ List<Path> subDirs = buildDirs(r, dirs.get(0), 2);
+
+ FileDeletionTask dependentDeletionTask =
+ del.createFileDeletionTask(null, dirs.get(0), new Path[] {});
+ List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
+ for (Path subDir : subDirs) {
+ FileDeletionTask deletionTask =
+ del.createFileDeletionTask(null, null, new Path[] { subDir });
+ deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+ deletionTasks.add(deletionTask);
+ }
+ for (FileDeletionTask task : deletionTasks) {
+ del.scheduleFileDeletionTask(task);
+ }
+
+ int msecToWait = 20 * 1000;
+ while (msecToWait > 0 && (lfs.util().exists(dirs.get(0)))) {
+ Thread.sleep(100);
+ msecToWait -= 100;
+ }
+ assertFalse(lfs.util().exists(dirs.get(0)));
+
+
+ // Now we will try to delete sub directories; one of the deletion task we
+ // will mark as failure and then parent directory should not be deleted.
+ subDirs = buildDirs(r, dirs.get(1), 2);
+ subDirs.add(new Path(dirs.get(1), "absentFile"));
+
+ dependentDeletionTask =
+ del.createFileDeletionTask(null, dirs.get(1), new Path[] {});
+ deletionTasks = new ArrayList<FileDeletionTask>();
+ for (Path subDir : subDirs) {
+ FileDeletionTask deletionTask =
+ del.createFileDeletionTask(null, null, new Path[] { subDir });
+ deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+ deletionTasks.add(deletionTask);
+ }
+ // marking one of the tasks as a failure.
+ deletionTasks.get(2).setSuccess(false);
+ for (FileDeletionTask task : deletionTasks) {
+ del.scheduleFileDeletionTask(task);
+ }
+
+ msecToWait = 20 * 1000;
+ while (msecToWait > 0
+ && (lfs.util().exists(subDirs.get(0)) || lfs.util().exists(
+ subDirs.get(1)))) {
+ Thread.sleep(100);
+ msecToWait -= 100;
+ }
+ assertTrue(lfs.util().exists(dirs.get(1)));
+ } finally {
+ del.stop();
+ }
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Mon Aug 12 21:25:49 2013
@@ -20,18 +20,19 @@ package org.apache.hadoop.yarn.server.no
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -46,7 +47,6 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Test;
@@ -62,7 +62,7 @@ public class TestEventFlow {
private static File remoteLogDir = new File("target",
TestEventFlow.class.getName() + "-remoteLogDir").getAbsoluteFile();
private static final long SIMULATED_RM_IDENTIFIER = 1234;
-
+
@Test
public void testSuccessfulContainerLaunch() throws InterruptedException,
IOException, YarnException {
@@ -107,6 +107,11 @@ public class TestEventFlow {
return new LocalRMInterface();
};
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
+
@Override
protected void startStatusUpdater() {
return; // Don't start any updating thread.
@@ -135,21 +140,25 @@ public class TestEventFlow {
ContainerId cID = ContainerId.newInstance(applicationAttemptId, 0);
String user = "testing";
- StartContainerRequest request =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- request.setContainerLaunchContext(launchContext);
- request.setContainerToken(TestContainerManager.createContainerToken(cID,
- SIMULATED_RM_IDENTIFIER, context.getNodeId(), user,
- context.getContainerTokenSecretManager()));
- containerManager.startContainer(request);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(launchContext,
+ TestContainerManager.createContainerToken(cID,
+ SIMULATED_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
ContainerState.RUNNING);
- StopContainerRequest stopRequest =
- recordFactory.newRecordInstance(StopContainerRequest.class);
- stopRequest.setContainerId(cID);
- containerManager.stopContainer(stopRequest);
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(cID);
+ StopContainersRequest stopRequest =
+ StopContainersRequest.newInstance(containerIds);
+ containerManager.stopContainers(stopRequest);
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
ContainerState.COMPLETE);
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java Mon Aug 12 21:25:49 2013
@@ -204,8 +204,10 @@ public class TestNMAuditLogger {
public void testNMAuditLoggerWithIP() throws Exception {
Configuration conf = new Configuration();
// start the IPC server
- Server server = RPC.getServer(TestProtocol.class,
- new MyTestRPCServer(), "0.0.0.0", 0, 5, true, conf, null);
+ Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
+ .setPort(0).setNumHandlers(5).setVerbose(true).build();
+
server.start();
InetSocketAddress addr = NetUtils.getConnectAddress(server);
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java Mon Aug 12 21:25:49 2013
@@ -39,8 +39,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -69,8 +71,8 @@ import org.mockito.ArgumentMatcher;
public class TestNodeManagerReboot {
- static final File basedir =
- new File("target", TestNodeManagerReboot.class.getName());
+ static final File basedir = new File("target",
+ TestNodeManagerReboot.class.getName());
static final File logsDir = new File(basedir, "logs");
static final File nmLocalDir = new File(basedir, "nm0");
static final File localResourceDir = new File(basedir, "resource");
@@ -100,7 +102,8 @@ public class TestNodeManagerReboot {
nm = new MyNodeManager();
nm.start();
- final ContainerManagementProtocol containerManager = nm.getContainerManager();
+ final ContainerManagementProtocol containerManager =
+ nm.getContainerManager();
// create files under fileCache
createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100);
@@ -112,16 +115,13 @@ public class TestNodeManagerReboot {
ContainerId cId = createContainerId();
URL localResourceUri =
- ConverterUtils.getYarnUrlFromPath(localFS
- .makeQualified(new Path(localResourceDir.getAbsolutePath())));
+ ConverterUtils.getYarnUrlFromPath(localFS.makeQualified(new Path(
+ localResourceDir.getAbsolutePath())));
LocalResource localResource =
- Records.newRecord(LocalResource.class);
- localResource.setResource(localResourceUri);
- localResource.setSize(-1);
- localResource.setVisibility(LocalResourceVisibility.APPLICATION);
- localResource.setType(LocalResourceType.FILE);
- localResource.setTimestamp(localResourceDir.lastModified());
+ LocalResource.newInstance(localResourceUri, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION, -1,
+ localResourceDir.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
@@ -129,32 +129,38 @@ public class TestNodeManagerReboot {
containerLaunchContext.setLocalResources(localResources);
List<String> commands = new ArrayList<String>();
containerLaunchContext.setCommands(commands);
-
- final StartContainerRequest startRequest =
- Records.newRecord(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
+
NodeId nodeId = nm.getNMContext().getNodeId();
- startRequest.setContainerToken(TestContainerManager.createContainerToken(
- cId, 0, nodeId, destinationFile, nm.getNMContext()
- .getContainerTokenSecretManager()));
- final UserGroupInformation currentUser = UserGroupInformation
- .createRemoteUser(cId.getApplicationAttemptId().toString());
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ TestContainerManager.createContainerToken(
+ cId, 0, nodeId, destinationFile, nm.getNMContext()
+ .getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ final StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+
+ final UserGroupInformation currentUser =
+ UserGroupInformation.createRemoteUser(cId.getApplicationAttemptId()
+ .toString());
NMTokenIdentifier nmIdentifier =
new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user, 123);
currentUser.addTokenIdentifier(nmIdentifier);
currentUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws YarnException, IOException {
- nm.getContainerManager().startContainer(startRequest);
+ nm.getContainerManager().startContainers(allRequests);
return null;
}
});
- GetContainerStatusRequest request =
- Records.newRecord(GetContainerStatusRequest.class);
- request.setContainerId(cId);
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(cId);
+ GetContainerStatusesRequest request =
+ GetContainerStatusesRequest.newInstance(containerIds);
Container container =
- nm.getNMContext().getContainers().get(request.getContainerId());
+ nm.getNMContext().getContainers().get(request.getContainerIds().get(0));
final int MAX_TRIES = 20;
int numTries = 0;
@@ -170,27 +176,31 @@ public class TestNodeManagerReboot {
Assert.assertEquals(ContainerState.DONE, container.getContainerState());
- Assert.assertTrue(
- "The container should create a subDir named currentUser: " + user +
- "under localDir/usercache",
+ Assert
+ .assertTrue(
+ "The container should create a subDir named currentUser: " + user
+ + "under localDir/usercache",
numOfLocalDirs(nmLocalDir.getAbsolutePath(),
- ContainerLocalizer.USERCACHE) > 0);
+ ContainerLocalizer.USERCACHE) > 0);
- Assert.assertTrue("There should be files or Dirs under nm_private when " +
- "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ Assert.assertTrue(
+ "There should be files or Dirs under nm_private when "
+ + "container is launched",
+ numOfLocalDirs(nmLocalDir.getAbsolutePath(),
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
// restart the NodeManager
nm.stop();
nm = new MyNodeManager();
- nm.start();
+ nm.start();
numTries = 0;
- while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
- .USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
- ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir
- .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
- > 0) && numTries < MAX_TRIES) {
+ while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.USERCACHE) > 0
+ || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(
+ nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0)
+ && numTries < MAX_TRIES) {
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
@@ -199,21 +209,27 @@ public class TestNodeManagerReboot {
numTries++;
}
- Assert.assertTrue("After NM reboots, all local files should be deleted",
- numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
- .USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
- ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir
- .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
- == 0);
+ Assert
+ .assertTrue(
+ "After NM reboots, all local files should be deleted",
+ numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.USERCACHE) == 0
+ && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ContainerLocalizer.FILECACHE) == 0
+ && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+ ResourceLocalizationService.NM_PRIVATE_DIR) == 0);
verify(delService, times(1)).delete(
- (String) isNull(),
- argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
- + "_DEL_")));
+ (String) isNull(),
+ argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
+ + "_DEL_")));
verify(delService, times(1)).delete((String) isNull(),
- argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
- verify(delService, times(1)).delete((String) isNull(),
- argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_")));
-
+ argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
+ verify(delService, times(1)).scheduleFileDeletionTask(
+ argThat(new FileDeletionInclude(user, null,
+ new String[] { destinationFile })));
+ verify(delService, times(1)).scheduleFileDeletionTask(
+ argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE
+ + "_DEL_", new String[] {})));
}
private int numOfLocalDirs(String localDir, String localSubDir) {
@@ -238,7 +254,8 @@ public class TestNodeManagerReboot {
private ContainerId createContainerId() {
ApplicationId appId = ApplicationId.newInstance(0, 0);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
return containerId;
}
@@ -253,8 +270,8 @@ public class TestNodeManagerReboot {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
- MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
- context, dispatcher, healthChecker, metrics);
+ MockNodeStatusUpdater myNodeStatusUpdater =
+ new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics);
return myNodeStatusUpdater;
}
@@ -288,4 +305,58 @@ public class TestNodeManagerReboot {
return ((Path) o).getName().indexOf(part) != -1;
}
}
+
+ class FileDeletionInclude extends ArgumentMatcher<FileDeletionTask> {
+ final String user;
+ final String subDirIncludes;
+ final String[] baseDirIncludes;
+
+ public FileDeletionInclude(String user, String subDirIncludes,
+ String [] baseDirIncludes) {
+ this.user = user;
+ this.subDirIncludes = subDirIncludes;
+ this.baseDirIncludes = baseDirIncludes;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ FileDeletionTask fd = (FileDeletionTask)o;
+ if (fd.getUser() == null && user != null) {
+ return false;
+ } else if (fd.getUser() != null && user == null) {
+ return false;
+ } else if (fd.getUser() != null && user != null) {
+ return fd.getUser().equals(user);
+ }
+ if (!comparePaths(fd.getSubDir(), subDirIncludes)) {
+ return false;
+ }
+ if (baseDirIncludes == null && fd.getBaseDirs() != null) {
+ return false;
+ } else if (baseDirIncludes != null && fd.getBaseDirs() == null ) {
+ return false;
+ } else if (baseDirIncludes != null && fd.getBaseDirs() != null) {
+ if (baseDirIncludes.length != fd.getBaseDirs().size()) {
+ return false;
+ }
+ for (int i =0 ; i < baseDirIncludes.length; i++) {
+ if (!comparePaths(fd.getBaseDirs().get(i), baseDirIncludes[i])) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ public boolean comparePaths(Path p1, String p2) {
+ if (p1 == null && p2 != null){
+ return false;
+ } else if (p1 != null && p2 == null) {
+ return false;
+ } else if (p1 != null && p2 != null ){
+ return p1.toUri().getPath().contains(p2.toString());
+ }
+ return true;
+ }
+ }
}