You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/09/14 00:49:38 UTC
svn commit: r1170378 [6/12] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/
dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-clie...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Tue Sep 13 22:49:27 2011
@@ -28,21 +28,15 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.service.AbstractService;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.*;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class DeletionService extends AbstractService {
-
static final Log LOG = LogFactory.getLog(DeletionService.class);
- /** Delay before deleting resource to ease debugging of NM issues */
- static final String DEBUG_DELAY_SEC =
- NMConfig.NM_PREFIX + "debug.delete.delay";
-
private int debugDelay;
private final ContainerExecutor exec;
private ScheduledThreadPoolExecutor sched;
@@ -79,10 +73,10 @@ public class DeletionService extends Abs
public void init(Configuration conf) {
if (conf != null) {
sched = new ScheduledThreadPoolExecutor(
- conf.getInt(NM_MAX_DELETE_THREADS, DEFAULT_MAX_DELETE_THREADS));
- debugDelay = conf.getInt(DEBUG_DELAY_SEC, 0);
+ conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT));
+ debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
} else {
- sched = new ScheduledThreadPoolExecutor(DEFAULT_MAX_DELETE_THREADS);
+ sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT);
}
sched.setKeepAliveTime(60L, SECONDS);
super.init(conf);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Tue Sep 13 22:49:27 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.util.Shell.Exit
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
@@ -44,8 +45,6 @@ public class LinuxContainerExecutor exte
.getLog(LinuxContainerExecutor.class);
private String containerExecutorExe;
- protected static final String CONTAINER_EXECUTOR_EXEC_KEY =
- NMConfig.NM_PREFIX + "linux-container-executor.path";
@Override
public void setConf(Configuration conf) {
@@ -98,7 +97,7 @@ public class LinuxContainerExecutor exte
new File(hadoopBin, "container-executor").getAbsolutePath();
return null == conf
? defaultPath
- : conf.get(CONTAINER_EXECUTOR_EXEC_KEY, defaultPath);
+ : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath);
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue Sep 13 22:49:27 2011
@@ -18,9 +18,6 @@
package org.apache.hadoop.yarn.server.nodemanager;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_CONTAINER_EXECUTOR_CLASS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_KEYTAB;
-
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -32,6 +29,7 @@ import org.apache.hadoop.NodeHealthCheck
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
@@ -42,27 +40,29 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.YarnServerConfig;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
public class NodeManager extends CompositeService {
private static final Log LOG = LogFactory.getLog(NodeManager.class);
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
+ protected ContainerTokenSecretManager containerTokenSecretManager;
public NodeManager() {
super(NodeManager.class.getName());
}
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+ ContainerTokenSecretManager containerTokenSecretManager) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
- metrics);
+ metrics, containerTokenSecretManager);
}
protected NodeResourceMonitor createNodeResourceMonitor() {
@@ -71,9 +71,10 @@ public class NodeManager extends Composi
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
- NodeStatusUpdater nodeStatusUpdater) {
+ NodeStatusUpdater nodeStatusUpdater, ContainerTokenSecretManager
+ containerTokenSecretManager) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
- metrics);
+ metrics, containerTokenSecretManager);
}
protected WebServer createWebServer(Context nmContext,
@@ -82,8 +83,8 @@ public class NodeManager extends Composi
}
protected void doSecureLogin() throws IOException {
- SecurityUtil.login(getConfig(), NM_KEYTAB,
- YarnServerConfig.NM_SERVER_PRINCIPAL_KEY);
+ SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
+ YarnConfiguration.NM_PRINCIPAL);
}
@Override
@@ -91,8 +92,15 @@ public class NodeManager extends Composi
Context context = new NMContext();
+ // Create the secretManager if need be.
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Security is enabled on NodeManager. "
+ + "Creating ContainerTokenSecretManager");
+ this.containerTokenSecretManager = new ContainerTokenSecretManager();
+ }
+
ContainerExecutor exec = ReflectionUtils.newInstance(
- conf.getClass(NM_CONTAINER_EXECUTOR_CLASS,
+ conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
DeletionService del = new DeletionService(exec);
addService(del);
@@ -106,18 +114,16 @@ public class NodeManager extends Composi
addService(healthChecker);
}
- // StatusUpdater should be added first so that it can start first. Once it
- // contacts RM, does registration and gets tokens, then only
- // ContainerManager can start.
NodeStatusUpdater nodeStatusUpdater =
- createNodeStatusUpdater(context, dispatcher, healthChecker);
- addService(nodeStatusUpdater);
+ createNodeStatusUpdater(context, dispatcher, healthChecker,
+ this.containerTokenSecretManager);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
ContainerManagerImpl containerManager =
- createContainerManager(context, exec, del, nodeStatusUpdater);
+ createContainerManager(context, exec, del, nodeStatusUpdater,
+ this.containerTokenSecretManager);
addService(containerManager);
Service webServer =
@@ -136,6 +142,10 @@ public class NodeManager extends Composi
DefaultMetricsSystem.initialize("NodeManager");
+ // StatusUpdater should be added last so that it get started last
+ // so that we make sure everything is up before registering with RM.
+ addService(nodeStatusUpdater);
+
super.init(conf);
// TODO add local dirs to del
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Sep 13 22:49:27 2011
@@ -30,13 +30,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
-import org.apache.hadoop.yarn.server.YarnServerConfig;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -56,6 +55,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.Records;
@@ -69,12 +69,12 @@ public class NodeStatusUpdaterImpl exten
private final Context context;
private final Dispatcher dispatcher;
+ private ContainerTokenSecretManager containerTokenSecretManager;
private long heartBeatInterval;
private ResourceTracker resourceTracker;
private String rmAddress;
private Resource totalResource;
private String containerManagerBindAddress;
- private String nodeHttpAddress;
private String hostName;
private int containerManagerPort;
private int httpPort;
@@ -87,23 +87,25 @@ public class NodeStatusUpdaterImpl exten
private final NodeManagerMetrics metrics;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
- NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
+ ContainerTokenSecretManager containerTokenSecretManager) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
+ this.containerTokenSecretManager = containerTokenSecretManager;
}
@Override
public synchronized void init(Configuration conf) {
this.rmAddress =
- conf.get(YarnServerConfig.RESOURCETRACKER_ADDRESS,
- YarnServerConfig.DEFAULT_RESOURCETRACKER_BIND_ADDRESS);
+ conf.get(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
this.heartBeatInterval =
- conf.getLong(NMConfig.HEARTBEAT_INTERVAL,
- NMConfig.DEFAULT_HEARTBEAT_INTERVAL);
- int memory = conf.getInt(NMConfig.NM_VMEM_GB, NMConfig.DEFAULT_NM_VMEM_GB);
+ conf.getLong(YarnConfiguration.NM_TO_RM_HEARTBEAT_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS);
+ int memory = conf.getInt(YarnConfiguration.NM_VMEM_GB, YarnConfiguration.DEFAULT_NM_VMEM_GB);
this.totalResource = recordFactory.newRecordInstance(Resource.class);
this.totalResource.setMemory(memory * 1024);
metrics.addResource(totalResource);
@@ -113,13 +115,13 @@ public class NodeStatusUpdaterImpl exten
@Override
public void start() {
String cmBindAddressStr =
- getConfig().get(NMConfig.NM_BIND_ADDRESS,
- NMConfig.DEFAULT_NM_BIND_ADDRESS);
+ getConfig().get(YarnConfiguration.NM_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_ADDRESS);
InetSocketAddress cmBindAddress =
NetUtils.createSocketAddr(cmBindAddressStr);
String httpBindAddressStr =
- getConfig().get(NMConfig.NM_HTTP_BIND_ADDRESS,
- NMConfig.DEFAULT_NM_HTTP_BIND_ADDRESS);
+ getConfig().get(YarnConfiguration.NM_WEBAPP_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
InetSocketAddress httpBindAddress =
NetUtils.createSocketAddr(httpBindAddressStr);
try {
@@ -128,7 +130,6 @@ public class NodeStatusUpdaterImpl exten
this.httpPort = httpBindAddress.getPort();
this.containerManagerBindAddress =
this.hostName + ":" + this.containerManagerPort;
- this.nodeHttpAddress = this.hostName + ":" + this.httpPort;
LOG.info("Configured ContainerManager Address is "
+ this.containerManagerBindAddress);
// Registration has to be in start so that ContainerManager can get the
@@ -176,8 +177,18 @@ public class NodeStatusUpdaterImpl exten
this.secretKeyBytes = regResponse.getSecretKey().array();
}
+ // do this now so that its set before we start heartbeating to RM
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Security enabled - updating secret keys now");
+ // It is expected that status updater is started by this point and
+ // RM gives the shared secret in registration during StatusUpdater#start().
+ this.containerTokenSecretManager.setSecretKey(
+ this.getContainerManagerBindAddress(),
+ this.getRMNMSharedSecret());
+ }
LOG.info("Registered with ResourceManager as " + this.containerManagerBindAddress
+ " with total resource of " + this.totalResource);
+
}
@Override
@@ -196,35 +207,28 @@ public class NodeStatusUpdaterImpl exten
nodeStatus.setNodeId(this.nodeId);
int numActiveContainers = 0;
+ List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
ContainerId containerId = e.getKey();
Container container = e.getValue();
- List<org.apache.hadoop.yarn.api.records.Container> applicationContainers = nodeStatus
- .getContainers(container.getContainerID().getAppId());
- if (applicationContainers == null) {
- applicationContainers = new ArrayList<org.apache.hadoop.yarn.api.records.Container>();
- nodeStatus.setContainers(container.getContainerID().getAppId(),
- applicationContainers);
- }
-
// Clone the container to send it to the RM
- org.apache.hadoop.yarn.api.records.Container c = container.cloneAndGetContainer();
- c.setNodeId(this.nodeId);
- c.setNodeHttpAddress(this.nodeHttpAddress); // TODO: don't set everytime.
- applicationContainers.add(c);
+ org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
+ container.cloneAndGetContainerStatus();
+ containersStatuses.add(containerStatus);
++numActiveContainers;
- LOG.info("Sending out status for container: " + c);
+ LOG.info("Sending out status for container: " + containerStatus);
- if (c.getState() == ContainerState.COMPLETE) {
+ if (containerStatus.getState() == ContainerState.COMPLETE) {
// Remove
i.remove();
LOG.info("Removed completed container " + containerId);
}
}
+ nodeStatus.setContainersStatuses(containersStatuses);
LOG.debug(this.containerManagerBindAddress + " sending out status for " + numActiveContainers
+ " containers");
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Tue Sep 13 22:49:27 2011
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.Service;
@@ -40,9 +42,6 @@ public class AuxServices extends Abstrac
private static final Log LOG = LogFactory.getLog(AuxServices.class);
- public static final String AUX_SERVICES = "nodemanager.auxiluary.services";
- public static final String AUX_SERVICE_CLASS_FMT =
- "nodemanager.aux.service.%s.class";
public final Map<String,AuxiliaryService> serviceMap;
public final Map<String,ByteBuffer> serviceMeta;
@@ -72,16 +71,24 @@ public class AuxServices extends Abstrac
* the the name of the service as defined in the configuration.
*/
public Map<String, ByteBuffer> getMeta() {
- return Collections.unmodifiableMap(serviceMeta);
+ Map<String, ByteBuffer> metaClone = new HashMap<String, ByteBuffer>(
+ serviceMeta.size());
+ synchronized (serviceMeta) {
+ for (Entry<String, ByteBuffer> entry : serviceMeta.entrySet()) {
+ metaClone.put(entry.getKey(), entry.getValue().duplicate());
+ }
+ }
+ return metaClone;
}
@Override
public void init(Configuration conf) {
- Collection<String> auxNames = conf.getStringCollection(AUX_SERVICES);
+ Collection<String> auxNames = conf.getStringCollection(
+ YarnConfiguration.NM_AUX_SERVICES);
for (final String sName : auxNames) {
try {
Class<? extends AuxiliaryService> sClass = conf.getClass(
- String.format(AUX_SERVICE_CLASS_FMT, sName), null,
+ String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, sName), null,
AuxiliaryService.class);
if (null == sClass) {
throw new RuntimeException("No class defiend for " + sName);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Sep 13 22:49:27 2011
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_BIND_ADDRESS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_BIND_ADDRESS;
import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
import java.io.IOException;
@@ -31,7 +29,6 @@ import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
@@ -67,7 +64,6 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -120,7 +116,8 @@ public class ContainerManagerImpl extend
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
- NodeManagerMetrics metrics) {
+ NodeManagerMetrics metrics, ContainerTokenSecretManager
+ containerTokenSecretManager) {
super(ContainerManagerImpl.class.getName());
this.context = context;
dispatcher = new AsyncDispatcher();
@@ -135,12 +132,7 @@ public class ContainerManagerImpl extend
addService(containersLauncher);
this.nodeStatusUpdater = nodeStatusUpdater;
- // Create the secretManager if need be.
- if (UserGroupInformation.isSecurityEnabled()) {
- LOG.info("Security is enabled on NodeManager. "
- + "Creating ContainerTokenSecretManager");
- this.containerTokenSecretManager = new ContainerTokenSecretManager();
- }
+ this.containerTokenSecretManager = containerTokenSecretManager;
// Start configurable services
auxiluaryServices = new AuxServices();
@@ -190,7 +182,7 @@ public class ContainerManagerImpl extend
@Override
public void init(Configuration conf) {
cmBindAddressStr = NetUtils.createSocketAddr(
- conf.get(NM_BIND_ADDRESS, DEFAULT_NM_BIND_ADDRESS));
+ conf.get(YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS));
super.init(conf);
}
@@ -200,22 +192,14 @@ public class ContainerManagerImpl extend
// Enqueue user dirs in deletion context
YarnRPC rpc = YarnRPC.create(getConfig());
- if (UserGroupInformation.isSecurityEnabled()) {
- // This is fine as status updater is started before ContainerManager and
- // RM gives the shared secret in registration during StatusUpdter#start()
- // itself.
- this.containerTokenSecretManager.setSecretKey(
- this.nodeStatusUpdater.getContainerManagerBindAddress(),
- this.nodeStatusUpdater.getRMNMSharedSecret());
- }
Configuration cmConf = new Configuration(getConfig());
cmConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
ContainerManagerSecurityInfo.class, SecurityInfo.class);
server =
rpc.getServer(ContainerManager.class, this, cmBindAddressStr, cmConf,
this.containerTokenSecretManager,
- cmConf.getInt(NMConfig.NM_CONTAINER_MGR_THREADS,
- NMConfig.DEFAULT_NM_CONTAINER_MGR_THREADS));
+ cmConf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
LOG.info("ContainerManager started at " + cmBindAddressStr);
server.start();
super.start();
@@ -266,7 +250,8 @@ public class ContainerManagerImpl extend
Container container =
new ContainerImpl(this.dispatcher, launchContext, credentials, metrics);
ContainerId containerID = launchContext.getContainerId();
- ApplicationId applicationID = containerID.getAppId();
+ ApplicationId applicationID =
+ containerID.getApplicationAttemptId().getApplicationId();
if (context.getContainers().putIfAbsent(containerID, container) != null) {
NMAuditLogger.logFailure(launchContext.getUser(),
AuditConstants.START_CONTAINER, "ContainerManagerImpl",
@@ -311,10 +296,18 @@ public class ContainerManagerImpl extend
Container container = this.context.getContainers().get(containerID);
if (container == null) {
LOG.warn("Trying to stop unknown container " + containerID);
- NMAuditLogger.logFailure(container.getUser(),
+ String userName;
+ try {
+ userName = UserGroupInformation.getCurrentUser().getUserName();
+ } catch (IOException e) {
+ LOG.error("Error finding userName", e);
+ return response;
+ }
+ NMAuditLogger.logFailure(userName,
AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
"Trying to stop unknown container!",
- containerID.getAppId(), containerID);
+ containerID.getApplicationAttemptId().getApplicationId(),
+ containerID);
return response; // Return immediately.
}
dispatcher.getEventHandler().handle(
@@ -326,7 +319,8 @@ public class ContainerManagerImpl extend
// should be the same or should be rejected by auth before here.
NMAuditLogger.logSuccess(container.getUser(),
AuditConstants.STOP_CONTAINER, "ContainerManageImpl",
- containerID.getAppId(), containerID);
+ containerID.getApplicationAttemptId().getApplicationId(),
+ containerID);
// TODO: Move this code to appropriate place once kill_container is
// implemented.
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java Tue Sep 13 22:49:27 2011
@@ -25,7 +25,7 @@ public class ApplicationContainerFinishe
public ApplicationContainerFinishedEvent(
ContainerId containerID) {
- super(containerID.getAppId(),
+ super(containerID.getApplicationAttemptId().getApplicationId(),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED);
this.containerID = containerID;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java Tue Sep 13 22:49:27 2011
@@ -25,7 +25,7 @@ public class ApplicationInitEvent extend
private final Container container;
public ApplicationInitEvent(Container container) {
- super(container.getContainerID().getAppId(),
+ super(container.getContainerID().getApplicationAttemptId().getApplicationId(),
ApplicationEventType.INIT_APPLICATION);
this.container = container;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java Tue Sep 13 22:49:27 2011
@@ -40,8 +40,6 @@ public interface Container extends Event
Map<Path,String> getLocalizedResources();
- org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer();
-
ContainerStatus cloneAndGetContainerStatus();
String toString();
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Sep 13 22:49:27 2011
@@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.no
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -38,6 +40,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -50,9 +53,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
@@ -72,7 +74,7 @@ public class ContainerImpl implements Co
private final Credentials credentials;
private final NodeManagerMetrics metrics;
private final ContainerLaunchContext launchContext;
- private String exitCode = "NA";
+ private int exitCode = YarnConfiguration.INVALID_CONTAINER_EXIT_STATUS;
private final StringBuilder diagnostics;
private static final Log LOG = LogFactory.getLog(Container.class);
@@ -81,6 +83,12 @@ public class ContainerImpl implements Co
new HashMap<LocalResourceRequest,String>();
private final Map<Path,String> localizedResources =
new HashMap<Path,String>();
+ private final List<LocalResourceRequest> publicRsrcs =
+ new ArrayList<LocalResourceRequest>();
+ private final List<LocalResourceRequest> privateRsrcs =
+ new ArrayList<LocalResourceRequest>();
+ private final List<LocalResourceRequest> appRsrcs =
+ new ArrayList<LocalResourceRequest>();
public ContainerImpl(Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@@ -318,24 +326,6 @@ public class ContainerImpl implements Co
}
@Override
- public
- org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer() {
- this.readLock.lock();
- try {
- org.apache.hadoop.yarn.api.records.Container c =
- recordFactory.newRecordInstance(
- org.apache.hadoop.yarn.api.records.Container.class);
- c.setId(this.launchContext.getContainerId());
- c.setResource(this.launchContext.getResource());
- c.setState(getCurrentState());
- c.setContainerStatus(cloneAndGetContainerStatus());
- return c;
- } finally {
- this.readLock.unlock();
- }
- }
-
- @Override
public ContainerLaunchContext getLaunchContext() {
this.readLock.lock();
try {
@@ -354,14 +344,14 @@ public class ContainerImpl implements Co
containerStatus.setState(getCurrentState());
containerStatus.setContainerId(this.launchContext.getContainerId());
containerStatus.setDiagnostics(diagnostics.toString());
- containerStatus.setExitStatus(String.valueOf(exitCode));
+ containerStatus.setExitStatus(exitCode);
return containerStatus;
} finally {
this.readLock.unlock();
}
}
- @SuppressWarnings("fallthrough")
+ @SuppressWarnings({"fallthrough", "unchecked"})
private void finished() {
switch (getContainerState()) {
case EXITED_WITH_SUCCESS:
@@ -369,7 +359,8 @@ public class ContainerImpl implements Co
metrics.completedContainer();
NMAuditLogger.logSuccess(getUser(),
AuditConstants.FINISH_SUCCESS_CONTAINER, "ContainerImpl",
- getContainerID().getAppId(), getContainerID());
+ getContainerID().getApplicationAttemptId().getApplicationId(),
+ getContainerID());
break;
case EXITED_WITH_FAILURE:
metrics.endRunningContainer();
@@ -379,7 +370,8 @@ public class ContainerImpl implements Co
NMAuditLogger.logFailure(getUser(),
AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
"Container failed with state: " + getContainerState(),
- getContainerID().getAppId(), getContainerID());
+ getContainerID().getApplicationAttemptId().getApplicationId(),
+ getContainerID());
break;
case CONTAINER_CLEANEDUP_AFTER_KILL:
metrics.endRunningContainer();
@@ -388,13 +380,15 @@ public class ContainerImpl implements Co
metrics.killedContainer();
NMAuditLogger.logSuccess(getUser(),
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
- getContainerID().getAppId(), getContainerID());
+ getContainerID().getApplicationAttemptId().getApplicationId(),
+ getContainerID());
}
metrics.releaseContainer(getLaunchContext().getResource());
// Inform the application
ContainerId containerID = getContainerID();
+ @SuppressWarnings("rawtypes")
EventHandler eventHandler = dispatcher.getEventHandler();
eventHandler.handle(new ApplicationContainerFinishedEvent(containerID));
// Remove the container from the resource-monitor
@@ -404,6 +398,24 @@ public class ContainerImpl implements Co
containerID, exitCode));
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void cleanup() {
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ if (!publicRsrcs.isEmpty()) {
+ rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
+ }
+ if (!privateRsrcs.isEmpty()) {
+ rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
+ }
+ if (!appRsrcs.isEmpty()) {
+ rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
+ }
+ dispatcher.getEventHandler().handle(
+ new ContainerLocalizationCleanupEvent(this, rsrc));
+ }
+
static class ContainerTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@@ -424,27 +436,22 @@ public class ContainerImpl implements Co
container.metrics.initingContainer();
// Inform the AuxServices about the opaque serviceData
- Map<String,ByteBuffer> csd = ctxt.getAllServiceData();
+ Map<String,ByteBuffer> csd = ctxt.getServiceData();
if (csd != null) {
// This can happen more than once per Application as each container may
// have distinct service data
for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
container.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
- ctxt.getUser(), ctxt.getContainerId().getAppId(),
+ ctxt.getUser(),
+ ctxt.getContainerId().getApplicationAttemptId().getApplicationId(),
service.getKey().toString(), service.getValue()));
}
}
// Send requests for public, private resources
- Map<String,LocalResource> cntrRsrc = ctxt.getAllLocalResources();
+ Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
if (!cntrRsrc.isEmpty()) {
- ArrayList<LocalResourceRequest> publicRsrc =
- new ArrayList<LocalResourceRequest>();
- ArrayList<LocalResourceRequest> privateRsrc =
- new ArrayList<LocalResourceRequest>();
- ArrayList<LocalResourceRequest> appRsrc =
- new ArrayList<LocalResourceRequest>();
try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try {
@@ -453,13 +460,13 @@ public class ContainerImpl implements Co
container.pendingResources.put(req, rsrc.getKey());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
- publicRsrc.add(req);
+ container.publicRsrcs.add(req);
break;
case PRIVATE:
- privateRsrc.add(req);
+ container.privateRsrcs.add(req);
break;
case APPLICATION:
- appRsrc.add(req);
+ container.appRsrcs.add(req);
break;
}
} catch (URISyntaxException e) {
@@ -471,27 +478,25 @@ public class ContainerImpl implements Co
} catch (URISyntaxException e) {
// malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e);
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZATION_FAILED;
}
- if (!publicRsrc.isEmpty()) {
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationRequestEvent(
- container, publicRsrc, LocalResourceVisibility.PUBLIC));
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ if (!container.publicRsrcs.isEmpty()) {
+ req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
}
- if (!privateRsrc.isEmpty()) {
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationRequestEvent(
- container, privateRsrc, LocalResourceVisibility.PRIVATE));
+ if (!container.privateRsrcs.isEmpty()) {
+ req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
}
- if (!appRsrc.isEmpty()) {
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationRequestEvent(
- container, appRsrc, LocalResourceVisibility.APPLICATION));
+ if (!container.appRsrcs.isEmpty()) {
+ req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
}
+
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
container.dispatcher.getEventHandler().handle(
@@ -546,7 +551,6 @@ public class ContainerImpl implements Co
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
@@ -554,31 +558,25 @@ public class ContainerImpl implements Co
// Inform the localizer to decrement reference counts and cleanup
// resources.
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithFailureTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
- container.exitCode = String.valueOf(exitEvent.getExitCode());
+ container.exitCode = exitEvent.getExitCode();
// TODO: Add containerWorkDir to the deletion service.
// TODO: Add containerOuputDir to the deletion service.
// Inform the localizer to decrement reference counts and cleanup
// resources.
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class ResourceFailedTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -592,30 +590,24 @@ public class ContainerImpl implements Co
// Inform the localizer to decrement reference counts and cleanup
// resources.
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
container.metrics.endInitingContainer();
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class KillDuringLocalizationTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Inform the localizer to decrement reference counts and cleanup
// resources.
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
container.metrics.endInitingContainer();
ContainerKillEvent killEvent = (ContainerKillEvent) event;
container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class LocalizedResourceDuringKillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -647,23 +639,19 @@ public class ContainerImpl implements Co
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class ContainerKilledTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
- container.exitCode = String.valueOf(exitEvent.getExitCode());
+ container.exitCode = exitEvent.getExitCode();
// The process/process-grp is killed. Decrement reference counts and
// cleanup resources
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class ContainerDoneTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -697,7 +685,8 @@ public class ContainerImpl implements Co
newState =
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
- LOG.warn("Can't handle this event at current state", e);
+ LOG.warn("Can't handle this event at current state: Current: ["
+ + oldState + "], eventType: [" + event.getType() + "]", e);
}
if (oldState != newState) {
LOG.info("Container " + containerID + " transitioned from "
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Tue Sep 13 22:49:27 2011
@@ -45,10 +45,10 @@ import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
@@ -79,7 +79,7 @@ public class ContainerLaunch implements
this.exec = exec;
this.container = container;
this.dispatcher = dispatcher;
- this.logDirsSelector = new LocalDirAllocator(NMConfig.NM_LOG_DIR);
+ this.logDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
}
@Override
@@ -89,8 +89,8 @@ public class ContainerLaunch implements
final Map<Path,String> localResources = container.getLocalizedResources();
String containerIdStr = ConverterUtils.toString(container.getContainerID());
final String user = launchContext.getUser();
- final Map<String,String> env = launchContext.getAllEnv();
- final List<String> command = launchContext.getCommandList();
+ final Map<String,String> env = launchContext.getEnv();
+ final List<String> command = launchContext.getCommands();
int ret = -1;
try {
@@ -107,10 +107,9 @@ public class ContainerLaunch implements
newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
containerLogDir.toUri().getPath()));
}
- launchContext.clearCommands();
- launchContext.addAllCommands(newCmds);
+ launchContext.setCommands(newCmds);
- Map<String, String> envs = launchContext.getAllEnv();
+ Map<String, String> envs = launchContext.getEnv();
Map<String, String> newEnvs = new HashMap<String, String>(envs.size());
for (Entry<String, String> entry : envs.entrySet()) {
newEnvs.put(
@@ -119,13 +118,12 @@ public class ContainerLaunch implements
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
containerLogDir.toUri().getPath()));
}
- launchContext.clearEnv();
- launchContext.addAllEnv(newEnvs);
+ launchContext.setEnv(newEnvs);
// /////////////////////////// End of variable expansion
FileContext lfs = FileContext.getLocalFSFileContext();
LocalDirAllocator lDirAllocator =
- new LocalDirAllocator(NMConfig.NM_LOCAL_DIR); // TODO
+ new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO
Path nmPrivateContainerScriptPath =
lDirAllocator.getLocalPathForWrite(
ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
@@ -152,8 +150,8 @@ public class ContainerLaunch implements
try {
// /////////// Write out the container-script in the nmPrivate space.
String[] localDirs =
- this.conf.getStrings(NMConfig.NM_LOCAL_DIR,
- NMConfig.DEFAULT_NM_LOCAL_DIR);
+ this.conf.getStrings(YarnConfiguration.NM_LOCAL_DIRS,
+ YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
List<Path> appDirs = new ArrayList<Path>(localDirs.length);
for (String localDir : localDirs) {
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
@@ -170,7 +168,7 @@ public class ContainerLaunch implements
containerWorkDir, FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
writeLaunchEnv(containerScriptOutStream, env, localResources,
- launchContext.getCommandList(), appDirs);
+ launchContext.getCommands(), appDirs);
// /////////// End of writing out container-script
// /////////// Write out the container-tokens in the nmPrivate space.
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Tue Sep 13 22:49:27 2011
@@ -103,7 +103,8 @@ public class ContainersLauncher extends
switch (event.getType()) {
case LAUNCH_CONTAINER:
Application app =
- context.getApplications().get(containerId.getAppId());
+ context.getApplications().get(
+ containerId.getApplicationAttemptId().getApplicationId());
ContainerLaunch launch =
new ContainerLaunch(getConfig(), dispatcher, exec, app,
event.getContainer());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Tue Sep 13 22:49:27 2011
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -274,7 +273,7 @@ public class ContainerLocalizer {
stat.setLocalPath(
ConverterUtils.getYarnUrlFromPath(localPath));
stat.setLocalSize(
- FileUtil.getDU(new File(localPath.getParent().toString())));
+ FileUtil.getDU(new File(localPath.getParent().toUri())));
stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
} catch (ExecutionException e) {
stat.setStatus(ResourceStatusType.FETCH_FAILURE);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Tue Sep 13 22:49:27 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.nod
* {@link LocalResourceVisibility}.
*
*/
+
class LocalResourcesTrackerImpl implements LocalResourcesTracker {
static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
@@ -83,7 +84,7 @@ class LocalResourcesTrackerImpl implemen
@Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup
- LocalizedResource rsrc = localrsrc.remove(rem.getRequest());
+ LocalizedResource rsrc = localrsrc.get(rem.getRequest());
if (null == rsrc) {
LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
" from " + getUser());
@@ -93,10 +94,11 @@ class LocalResourcesTrackerImpl implemen
|| ResourceState.DOWNLOADING.equals(rsrc.getState())
|| rsrc != rem) {
// internal error
- LOG.error("Attempt to remove resource with non-zero refcount");
+ LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount");
assert false;
return false;
}
+ localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), rsrc.getLocalPath());
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Tue Sep 13 22:49:27 2011
@@ -120,7 +120,8 @@ public class LocalizedResource implement
for (ContainerId c : ref) {
sb.append("(").append(c.toString()).append(")");
}
- sb.append("],").append(getTimestamp()).append("}");
+ sb.append("],").append(getTimestamp()).append(",")
+ .append(getState()).append("}");
return sb.toString();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Sep 13 22:49:27 2011
@@ -22,6 +22,7 @@ import java.io.File;
import java.net.URISyntaxException;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -43,6 +44,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -50,18 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnC
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_MAX_PUBLIC_FETCH_THREADS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_CACHE_CLEANUP_MS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCALIZER_BIND_ADDRESS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOG_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_TARGET_CACHE_MB;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_CACHE_CLEANUP_MS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCALIZER_BIND_ADDRESS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOG_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_MAX_PUBLIC_FETCH_THREADS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_TARGET_CACHE_MB;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -75,7 +65,6 @@ import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -91,7 +80,6 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@@ -106,7 +94,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -114,6 +102,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
@@ -154,7 +143,7 @@ public class ResourceLocalizationService
this.exec = exec;
this.dispatcher = dispatcher;
this.delService = delService;
- this.localDirsSelector = new LocalDirAllocator(NMConfig.NM_LOCAL_DIR);
+ this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1);
}
@@ -174,7 +163,7 @@ public class ResourceLocalizationService
// TODO queue deletions here, rather than NM init?
FileContext lfs = getLocalFileContext(conf);
String[] sLocalDirs =
- conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR);
+ conf.getStrings(YarnConfiguration.NM_LOCAL_DIRS, YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
localDirs = new ArrayList<Path>(sLocalDirs.length);
logDirs = new ArrayList<Path>(sLocalDirs.length);
@@ -193,7 +182,7 @@ public class ResourceLocalizationService
lfs.mkdir(sysdir, NM_PRIVATE_PERM, true);
sysDirs.add(sysdir);
}
- String[] sLogdirs = conf.getStrings(NM_LOG_DIR, DEFAULT_NM_LOG_DIR);
+ String[] sLogdirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
for (String sLogdir : sLogdirs) {
Path logdir = new Path(sLogdir);
logDirs.add(logdir);
@@ -206,12 +195,12 @@ public class ResourceLocalizationService
logDirs = Collections.unmodifiableList(logDirs);
sysDirs = Collections.unmodifiableList(sysDirs);
cacheTargetSize =
- conf.getLong(NM_TARGET_CACHE_MB, DEFAULT_NM_TARGET_CACHE_MB) << 20;
+ conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
cacheCleanupPeriod =
- conf.getLong(NM_CACHE_CLEANUP_MS, DEFAULT_NM_CACHE_CLEANUP_MS);
+ conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
localizationServerAddress = NetUtils.createSocketAddr(
- conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS));
- localizerTracker = new LocalizerTracker(conf);
+ conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
+ localizerTracker = createLocalizerTracker(conf);
dispatcher.register(LocalizerEventType.class, localizerTracker);
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
@@ -231,6 +220,10 @@ public class ResourceLocalizationService
super.start();
}
+ LocalizerTracker createLocalizerTracker(Configuration conf) {
+ return new LocalizerTracker(conf);
+ }
+
Server createServer() {
YarnRPC rpc = YarnRPC.create(getConfig());
Configuration conf = new Configuration(getConfig()); // Clone to separate
@@ -244,8 +237,8 @@ public class ResourceLocalizationService
return rpc.getServer(LocalizationProtocol.class, this,
localizationServerAddress, conf, secretManager,
- conf.getInt(NMConfig.NM_LOCALIZATION_THREADS,
- NMConfig.DEFAULT_NM_LOCALIZATION_THREADS));
+ conf.getInt(YarnConfiguration.NM_LOCALIZER_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT));
}
@@ -265,6 +258,9 @@ public class ResourceLocalizationService
public void handle(LocalizationEvent event) {
String userName;
String appIDStr;
+ Container c;
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs;
+ LocalResourcesTracker tracker;
// TODO: create log dir as $logdir/$user/$appId
switch (event.getType()) {
case INIT_APPLICATION_RESOURCES:
@@ -289,28 +285,17 @@ public class ResourceLocalizationService
case INIT_CONTAINER_RESOURCES:
ContainerLocalizationRequestEvent rsrcReqs =
(ContainerLocalizationRequestEvent) event;
- Container c = rsrcReqs.getContainer();
+ c = rsrcReqs.getContainer();
LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerID(), c.getCredentials());
- final LocalResourcesTracker tracker;
- LocalResourceVisibility vis = rsrcReqs.getVisibility();
- switch (vis) {
- default:
- case PUBLIC:
- tracker = publicRsrc;
- break;
- case PRIVATE:
- tracker = privateRsrc.get(c.getUser());
- break;
- case APPLICATION:
- tracker =
- appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId()));
- break;
- }
- // We get separate events one each for all resources of one visibility. So
- // all the resources in this event are of the same visibility.
- for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
- tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
+ rsrcs = rsrcReqs.getRequestedResources();
+ for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
+ rsrcs.entrySet()) {
+ tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
+ c.getContainerID().getApplicationAttemptId().getApplicationId());
+ for (LocalResourceRequest req : e.getValue()) {
+ tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
+ }
}
break;
case CACHE_CLEANUP:
@@ -324,14 +309,25 @@ public class ResourceLocalizationService
}
break;
case CLEANUP_CONTAINER_RESOURCES:
- Container container =
- ((ContainerLocalizationEvent)event).getContainer();
+ ContainerLocalizationCleanupEvent rsrcCleanup =
+ (ContainerLocalizationCleanupEvent) event;
+ c = rsrcCleanup.getContainer();
+ rsrcs = rsrcCleanup.getResources();
+ for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
+ rsrcs.entrySet()) {
+ tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
+ c.getContainerID().getApplicationAttemptId().getApplicationId());
+ for (LocalResourceRequest req : e.getValue()) {
+ tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
+ }
+ }
// Delete the container directories
- userName = container.getUser();
- String containerIDStr = container.toString();
+ userName = c.getUser();
+ String containerIDStr = c.toString();
appIDStr =
- ConverterUtils.toString(container.getContainerID().getAppId());
+ ConverterUtils.toString(
+ c.getContainerID().getApplicationAttemptId().getApplicationId());
for (Path localDir : localDirs) {
// Delete the user-owned container-dir
@@ -349,8 +345,7 @@ public class ResourceLocalizationService
delService.delete(null, containerSysDir, new Path[] {});
}
- dispatcher.getEventHandler().handle(new ContainerEvent(
- container.getContainerID(),
+ dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
break;
case DESTROY_APPLICATION_RESOURCES:
@@ -392,6 +387,19 @@ public class ResourceLocalizationService
}
}
+ LocalResourcesTracker getLocalResourcesTracker(
+ LocalResourceVisibility visibility, String user, ApplicationId appId) {
+ switch (visibility) {
+ default:
+ case PUBLIC:
+ return publicRsrc;
+ case PRIVATE:
+ return privateRsrc.get(user);
+ case APPLICATION:
+ return appRsrc.get(ConverterUtils.toString(appId));
+ }
+ }
+
/**
* Sub-component handling the spawning of {@link ContainerLocalizer}s
*/
@@ -496,7 +504,7 @@ public class ResourceLocalizationService
PublicLocalizer(Configuration conf) {
this(conf, getLocalFileContext(conf),
Executors.newFixedThreadPool(conf.getInt(
- NM_MAX_PUBLIC_FETCH_THREADS, DEFAULT_MAX_PUBLIC_FETCH_THREADS)),
+ YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT)),
new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
}
@@ -539,6 +547,7 @@ public class ResourceLocalizationService
}
@Override
+ @SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
try {
// TODO shutdown, better error handling esp. DU
@@ -664,6 +673,7 @@ public class ResourceLocalizationService
}
// TODO this sucks. Fix it later
+ @SuppressWarnings("unchecked") // dispatcher not typed
LocalizerHeartbeatResponse update(
List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response =
@@ -780,7 +790,9 @@ public class ResourceLocalizationService
// 2) exec initApplication and wait
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(),
- ConverterUtils.toString(context.getContainerId().getAppId()),
+ ConverterUtils.toString(
+ context.getContainerId().
+ getApplicationAttemptId().getApplicationId()),
localizerId, localDirs);
// TODO handle ExitCodeException separately?
} catch (Exception e) {
@@ -808,6 +820,7 @@ public class ResourceLocalizationService
}
@Override
+ @SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
dispatcher.getEventHandler().handle(
new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP));
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java Tue Sep 13 22:49:27 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import java.util.Collection;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -26,27 +27,23 @@ import org.apache.hadoop.yarn.server.nod
public class ContainerLocalizationRequestEvent extends
ContainerLocalizationEvent {
- private final LocalResourceVisibility vis;
- private final Collection<LocalResourceRequest> reqs;
+ private final Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+ rsrc;
/**
- * Event requesting the localization of the reqs all with visibility vis
+ * Event requesting the localization of the rsrc.
* @param c
- * @param reqs
- * @param vis
+ * @param rsrc
*/
public ContainerLocalizationRequestEvent(Container c,
- Collection<LocalResourceRequest> reqs, LocalResourceVisibility vis) {
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
- this.vis = vis;
- this.reqs = reqs;
+ this.rsrc = rsrc;
}
- public LocalResourceVisibility getVisibility() {
- return vis;
+ public
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+ getRequestedResources() {
+ return rsrc;
}
-
- public Collection<LocalResourceRequest> getRequestedResources() {
- return reqs;
- }
-}
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java Tue Sep 13 22:49:27 2011
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
-import java.net.URISyntaxException;
-
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
@@ -26,8 +24,8 @@ public class ResourceReleaseEvent extend
private final ContainerId container;
- public ResourceReleaseEvent(LocalResourceRequest rsrc, ContainerId container)
- throws URISyntaxException {
+ public ResourceReleaseEvent(LocalResourceRequest rsrc,
+ ContainerId container) {
super(rsrc, ResourceEventType.RELEASE);
this.container = container;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java Tue Sep 13 22:49:27 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.file.tfile.TFile;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class AggregatedLogFormat {
@@ -89,8 +90,11 @@ public class AggregatedLogFormat {
public void write(DataOutputStream out) throws IOException {
for (String rootLogDir : this.rootLogDirs) {
File appLogDir =
- new File(rootLogDir, ConverterUtils.toString(this.containerId
- .getAppId()));
+ new File(rootLogDir,
+ ConverterUtils.toString(
+ this.containerId.getApplicationAttemptId().
+ getApplicationId())
+ );
File containerLogDir =
new File(appLogDir, ConverterUtils.toString(this.containerId));
@@ -148,8 +152,8 @@ public class AggregatedLogFormat {
// 256KB minBlockSize : Expected log size for each container too
this.writer =
new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
- LogAggregationService.LOG_COMPRESSION_TYPE,
- LogAggregationService.DEFAULT_COMPRESSION_TYPE), null, conf);
+ YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
}
public void append(LogKey logKey, LogValue logValue) throws IOException {