You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/09/14 00:49:38 UTC

svn commit: r1170378 [6/12] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-clie...

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Tue Sep 13 22:49:27 2011
@@ -28,21 +28,15 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.service.AbstractService;
 
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.*;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public class DeletionService extends AbstractService {
-
   static final Log LOG = LogFactory.getLog(DeletionService.class);
-  /** Delay before deleting resource to ease debugging of NM issues */
-  static final String DEBUG_DELAY_SEC =
-    NMConfig.NM_PREFIX + "debug.delete.delay";
-
   private int debugDelay;
   private final ContainerExecutor exec;
   private ScheduledThreadPoolExecutor sched;
@@ -79,10 +73,10 @@ public class DeletionService extends Abs
   public void init(Configuration conf) {
     if (conf != null) {
       sched = new ScheduledThreadPoolExecutor(
-          conf.getInt(NM_MAX_DELETE_THREADS, DEFAULT_MAX_DELETE_THREADS));
-      debugDelay = conf.getInt(DEBUG_DELAY_SEC, 0);
+          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT));
+      debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
     } else {
-      sched = new ScheduledThreadPoolExecutor(DEFAULT_MAX_DELETE_THREADS);
+      sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT);
     }
     sched.setKeepAliveTime(60L, SECONDS);
     super.init(conf);

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Tue Sep 13 22:49:27 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.util.Shell.Exit
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
@@ -44,8 +45,6 @@ public class LinuxContainerExecutor exte
       .getLog(LinuxContainerExecutor.class);
 
   private String containerExecutorExe;
-  protected static final String CONTAINER_EXECUTOR_EXEC_KEY =
-    NMConfig.NM_PREFIX + "linux-container-executor.path";
   
   @Override
   public void setConf(Configuration conf) {
@@ -98,7 +97,7 @@ public class LinuxContainerExecutor exte
       new File(hadoopBin, "container-executor").getAbsolutePath();
     return null == conf
       ? defaultPath
-      : conf.get(CONTAINER_EXECUTOR_EXEC_KEY, defaultPath);
+      : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath);
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue Sep 13 22:49:27 2011
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_CONTAINER_EXECUTOR_CLASS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_KEYTAB;
-
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -32,6 +29,7 @@ import org.apache.hadoop.NodeHealthCheck
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
@@ -42,27 +40,29 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.YarnServerConfig;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 
 public class NodeManager extends CompositeService {
   private static final Log LOG = LogFactory.getLog(NodeManager.class);
   protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
+  protected ContainerTokenSecretManager containerTokenSecretManager;
 
   public NodeManager() {
     super(NodeManager.class.getName());
   }
 
   protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-      Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+      ContainerTokenSecretManager containerTokenSecretManager) {
     return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
-                                     metrics);
+                                     metrics, containerTokenSecretManager);
   }
 
   protected NodeResourceMonitor createNodeResourceMonitor() {
@@ -71,9 +71,10 @@ public class NodeManager extends Composi
 
   protected ContainerManagerImpl createContainerManager(Context context,
       ContainerExecutor exec, DeletionService del,
-      NodeStatusUpdater nodeStatusUpdater) {
+      NodeStatusUpdater nodeStatusUpdater, ContainerTokenSecretManager 
+      containerTokenSecretManager) {
     return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
-                                    metrics);
+                                    metrics, containerTokenSecretManager);
   }
 
   protected WebServer createWebServer(Context nmContext,
@@ -82,8 +83,8 @@ public class NodeManager extends Composi
   }
 
   protected void doSecureLogin() throws IOException {
-    SecurityUtil.login(getConfig(), NM_KEYTAB,
-        YarnServerConfig.NM_SERVER_PRINCIPAL_KEY);
+    SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
+        YarnConfiguration.NM_PRINCIPAL);
   }
 
   @Override
@@ -91,8 +92,15 @@ public class NodeManager extends Composi
 
     Context context = new NMContext();
 
+    // Create the secretManager if need be.
+    if (UserGroupInformation.isSecurityEnabled()) {
+      LOG.info("Security is enabled on NodeManager. "
+          + "Creating ContainerTokenSecretManager");
+      this.containerTokenSecretManager = new ContainerTokenSecretManager();
+    }
+
     ContainerExecutor exec = ReflectionUtils.newInstance(
-        conf.getClass(NM_CONTAINER_EXECUTOR_CLASS,
+        conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
           DefaultContainerExecutor.class, ContainerExecutor.class), conf);
     DeletionService del = new DeletionService(exec);
     addService(del);
@@ -106,18 +114,16 @@ public class NodeManager extends Composi
       addService(healthChecker);
     }
 
-    // StatusUpdater should be added first so that it can start first. Once it
-    // contacts RM, does registration and gets tokens, then only
-    // ContainerManager can start.
     NodeStatusUpdater nodeStatusUpdater =
-        createNodeStatusUpdater(context, dispatcher, healthChecker);
-    addService(nodeStatusUpdater);
+        createNodeStatusUpdater(context, dispatcher, healthChecker, 
+        this.containerTokenSecretManager);
 
     NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
     addService(nodeResourceMonitor);
 
     ContainerManagerImpl containerManager =
-        createContainerManager(context, exec, del, nodeStatusUpdater);
+        createContainerManager(context, exec, del, nodeStatusUpdater,
+        this.containerTokenSecretManager);
     addService(containerManager);
 
     Service webServer =
@@ -136,6 +142,10 @@ public class NodeManager extends Composi
 
     DefaultMetricsSystem.initialize("NodeManager");
 
+    // StatusUpdater should be added last so that it get started last 
+    // so that we make sure everything is up before registering with RM. 
+    addService(nodeStatusUpdater);
+
     super.init(conf);
     // TODO add local dirs to del
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Sep 13 22:49:27 2011
@@ -30,13 +30,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.NodeHealthCheckerService;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
-import org.apache.hadoop.yarn.server.YarnServerConfig;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -56,6 +55,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -69,12 +69,12 @@ public class NodeStatusUpdaterImpl exten
   private final Context context;
   private final Dispatcher dispatcher;
 
+  private ContainerTokenSecretManager containerTokenSecretManager;
   private long heartBeatInterval;
   private ResourceTracker resourceTracker;
   private String rmAddress;
   private Resource totalResource;
   private String containerManagerBindAddress;
-  private String nodeHttpAddress;
   private String hostName;
   private int containerManagerPort;
   private int httpPort;
@@ -87,23 +87,25 @@ public class NodeStatusUpdaterImpl exten
   private final NodeManagerMetrics metrics;
 
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
-      NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+      NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, 
+      ContainerTokenSecretManager containerTokenSecretManager) {
     super(NodeStatusUpdaterImpl.class.getName());
     this.healthChecker = healthChecker;
     this.context = context;
     this.dispatcher = dispatcher;
     this.metrics = metrics;
+    this.containerTokenSecretManager = containerTokenSecretManager;
   }
 
   @Override
   public synchronized void init(Configuration conf) {
     this.rmAddress =
-        conf.get(YarnServerConfig.RESOURCETRACKER_ADDRESS,
-            YarnServerConfig.DEFAULT_RESOURCETRACKER_BIND_ADDRESS);
+        conf.get(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+            YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
     this.heartBeatInterval =
-        conf.getLong(NMConfig.HEARTBEAT_INTERVAL,
-            NMConfig.DEFAULT_HEARTBEAT_INTERVAL);
-    int memory = conf.getInt(NMConfig.NM_VMEM_GB, NMConfig.DEFAULT_NM_VMEM_GB);
+        conf.getLong(YarnConfiguration.NM_TO_RM_HEARTBEAT_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS);
+    int memory = conf.getInt(YarnConfiguration.NM_VMEM_GB, YarnConfiguration.DEFAULT_NM_VMEM_GB);
     this.totalResource = recordFactory.newRecordInstance(Resource.class);
     this.totalResource.setMemory(memory * 1024);
     metrics.addResource(totalResource);
@@ -113,13 +115,13 @@ public class NodeStatusUpdaterImpl exten
   @Override
   public void start() {
     String cmBindAddressStr =
-        getConfig().get(NMConfig.NM_BIND_ADDRESS,
-            NMConfig.DEFAULT_NM_BIND_ADDRESS);
+        getConfig().get(YarnConfiguration.NM_ADDRESS,
+            YarnConfiguration.DEFAULT_NM_ADDRESS);
     InetSocketAddress cmBindAddress =
         NetUtils.createSocketAddr(cmBindAddressStr);
     String httpBindAddressStr =
-      getConfig().get(NMConfig.NM_HTTP_BIND_ADDRESS,
-          NMConfig.DEFAULT_NM_HTTP_BIND_ADDRESS);
+      getConfig().get(YarnConfiguration.NM_WEBAPP_ADDRESS,
+          YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
     InetSocketAddress httpBindAddress =
       NetUtils.createSocketAddr(httpBindAddressStr);
     try {
@@ -128,7 +130,6 @@ public class NodeStatusUpdaterImpl exten
       this.httpPort = httpBindAddress.getPort();
       this.containerManagerBindAddress =
           this.hostName + ":" + this.containerManagerPort;
-      this.nodeHttpAddress = this.hostName + ":" + this.httpPort;
       LOG.info("Configured ContainerManager Address is "
           + this.containerManagerBindAddress);
       // Registration has to be in start so that ContainerManager can get the
@@ -176,8 +177,18 @@ public class NodeStatusUpdaterImpl exten
       this.secretKeyBytes = regResponse.getSecretKey().array();
     }
 
+    // do this now so that its set before we start heartbeating to RM
+    if (UserGroupInformation.isSecurityEnabled()) {
+      LOG.info("Security enabled - updating secret keys now");
+      // It is expected that status updater is started by this point and
+      // RM gives the shared secret in registration during StatusUpdater#start().
+      this.containerTokenSecretManager.setSecretKey(
+          this.getContainerManagerBindAddress(),
+          this.getRMNMSharedSecret());
+    }
     LOG.info("Registered with ResourceManager as " + this.containerManagerBindAddress
         + " with total resource of " + this.totalResource);
+
   }
 
   @Override
@@ -196,35 +207,28 @@ public class NodeStatusUpdaterImpl exten
     nodeStatus.setNodeId(this.nodeId);
 
     int numActiveContainers = 0;
+    List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
     for (Iterator<Entry<ContainerId, Container>> i =
         this.context.getContainers().entrySet().iterator(); i.hasNext();) {
       Entry<ContainerId, Container> e = i.next();
       ContainerId containerId = e.getKey();
       Container container = e.getValue();
 
-      List<org.apache.hadoop.yarn.api.records.Container> applicationContainers = nodeStatus
-          .getContainers(container.getContainerID().getAppId());
-      if (applicationContainers == null) {
-        applicationContainers = new ArrayList<org.apache.hadoop.yarn.api.records.Container>();
-        nodeStatus.setContainers(container.getContainerID().getAppId(),
-            applicationContainers);
-      }
-
       // Clone the container to send it to the RM
-      org.apache.hadoop.yarn.api.records.Container c = container.cloneAndGetContainer();
-      c.setNodeId(this.nodeId);
-      c.setNodeHttpAddress(this.nodeHttpAddress); // TODO: don't set everytime.
-      applicationContainers.add(c);
+      org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = 
+          container.cloneAndGetContainerStatus();
+      containersStatuses.add(containerStatus);
       ++numActiveContainers;
-      LOG.info("Sending out status for container: " + c);
+      LOG.info("Sending out status for container: " + containerStatus);
 
-      if (c.getState() == ContainerState.COMPLETE) {
+      if (containerStatus.getState() == ContainerState.COMPLETE) {
         // Remove
         i.remove();
 
         LOG.info("Removed completed container " + containerId);
       }
     }
+    nodeStatus.setContainersStatuses(containersStatuses);
 
     LOG.debug(this.containerManagerBindAddress + " sending out status for " + numActiveContainers
         + " containers");

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Tue Sep 13 22:49:27 2011
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.Service;
@@ -40,9 +42,6 @@ public class AuxServices extends Abstrac
 
   private static final Log LOG = LogFactory.getLog(AuxServices.class);
 
-  public static final String AUX_SERVICES = "nodemanager.auxiluary.services";
-  public static final String AUX_SERVICE_CLASS_FMT =
-    "nodemanager.aux.service.%s.class";
   public final Map<String,AuxiliaryService> serviceMap;
   public final Map<String,ByteBuffer> serviceMeta;
 
@@ -72,16 +71,24 @@ public class AuxServices extends Abstrac
    * the the name of the service as defined in the configuration.
    */
   public Map<String, ByteBuffer> getMeta() {
-    return Collections.unmodifiableMap(serviceMeta);
+    Map<String, ByteBuffer> metaClone = new HashMap<String, ByteBuffer>(
+        serviceMeta.size());
+    synchronized (serviceMeta) {
+      for (Entry<String, ByteBuffer> entry : serviceMeta.entrySet()) {
+        metaClone.put(entry.getKey(), entry.getValue().duplicate());
+      }
+    }
+    return metaClone;
   }
 
   @Override
   public void init(Configuration conf) {
-    Collection<String> auxNames = conf.getStringCollection(AUX_SERVICES);
+    Collection<String> auxNames = conf.getStringCollection(
+        YarnConfiguration.NM_AUX_SERVICES);
     for (final String sName : auxNames) {
       try {
         Class<? extends AuxiliaryService> sClass = conf.getClass(
-              String.format(AUX_SERVICE_CLASS_FMT, sName), null,
+              String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, sName), null,
               AuxiliaryService.class);
         if (null == sClass) {
           throw new RuntimeException("No class defiend for " + sName);

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Sep 13 22:49:27 2011
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_BIND_ADDRESS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_BIND_ADDRESS;
 import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
 
 import java.io.IOException;
@@ -31,7 +29,6 @@ import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
@@ -67,7 +64,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -120,7 +116,8 @@ public class ContainerManagerImpl extend
 
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
-      NodeManagerMetrics metrics) {
+      NodeManagerMetrics metrics, ContainerTokenSecretManager 
+      containerTokenSecretManager) {
     super(ContainerManagerImpl.class.getName());
     this.context = context;
     dispatcher = new AsyncDispatcher();
@@ -135,12 +132,7 @@ public class ContainerManagerImpl extend
     addService(containersLauncher);
 
     this.nodeStatusUpdater = nodeStatusUpdater;
-    // Create the secretManager if need be.
-    if (UserGroupInformation.isSecurityEnabled()) {
-      LOG.info("Security is enabled on NodeManager. "
-          + "Creating ContainerTokenSecretManager");
-      this.containerTokenSecretManager = new ContainerTokenSecretManager();
-    }
+    this.containerTokenSecretManager = containerTokenSecretManager;
 
     // Start configurable services
     auxiluaryServices = new AuxServices();
@@ -190,7 +182,7 @@ public class ContainerManagerImpl extend
   @Override
   public void init(Configuration conf) {
     cmBindAddressStr = NetUtils.createSocketAddr(
-        conf.get(NM_BIND_ADDRESS, DEFAULT_NM_BIND_ADDRESS));
+        conf.get(YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS));
     super.init(conf);
   }
 
@@ -200,22 +192,14 @@ public class ContainerManagerImpl extend
     // Enqueue user dirs in deletion context
 
     YarnRPC rpc = YarnRPC.create(getConfig());
-    if (UserGroupInformation.isSecurityEnabled()) {
-      // This is fine as status updater is started before ContainerManager and
-      // RM gives the shared secret in registration during StatusUpdter#start()
-      // itself.
-      this.containerTokenSecretManager.setSecretKey(
-          this.nodeStatusUpdater.getContainerManagerBindAddress(),
-          this.nodeStatusUpdater.getRMNMSharedSecret());
-    }
     Configuration cmConf = new Configuration(getConfig());
     cmConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
         ContainerManagerSecurityInfo.class, SecurityInfo.class);
     server =
         rpc.getServer(ContainerManager.class, this, cmBindAddressStr, cmConf,
             this.containerTokenSecretManager,
-            cmConf.getInt(NMConfig.NM_CONTAINER_MGR_THREADS, 
-                NMConfig.DEFAULT_NM_CONTAINER_MGR_THREADS));
+            cmConf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, 
+                YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
     LOG.info("ContainerManager started at " + cmBindAddressStr);
     server.start();
     super.start();
@@ -266,7 +250,8 @@ public class ContainerManagerImpl extend
     Container container =
         new ContainerImpl(this.dispatcher, launchContext, credentials, metrics);
     ContainerId containerID = launchContext.getContainerId();
-    ApplicationId applicationID = containerID.getAppId();
+    ApplicationId applicationID = 
+        containerID.getApplicationAttemptId().getApplicationId();
     if (context.getContainers().putIfAbsent(containerID, container) != null) {
       NMAuditLogger.logFailure(launchContext.getUser(), 
           AuditConstants.START_CONTAINER, "ContainerManagerImpl",
@@ -311,10 +296,18 @@ public class ContainerManagerImpl extend
     Container container = this.context.getContainers().get(containerID);
     if (container == null) {
       LOG.warn("Trying to stop unknown container " + containerID);
-      NMAuditLogger.logFailure(container.getUser(),
+      String userName;
+      try {
+        userName = UserGroupInformation.getCurrentUser().getUserName();
+      } catch (IOException e) {
+        LOG.error("Error finding userName", e);
+        return response;
+      }
+      NMAuditLogger.logFailure(userName,
           AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
           "Trying to stop unknown container!",
-          containerID.getAppId(), containerID);
+          containerID.getApplicationAttemptId().getApplicationId(), 
+          containerID);
       return response; // Return immediately.
     }
     dispatcher.getEventHandler().handle(
@@ -326,7 +319,8 @@ public class ContainerManagerImpl extend
     // should be the same or should be rejected by auth before here. 
     NMAuditLogger.logSuccess(container.getUser(), 
         AuditConstants.STOP_CONTAINER, "ContainerManageImpl", 
-        containerID.getAppId(), containerID);
+        containerID.getApplicationAttemptId().getApplicationId(), 
+        containerID);
 
     // TODO: Move this code to appropriate place once kill_container is
     // implemented.

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java Tue Sep 13 22:49:27 2011
@@ -25,7 +25,7 @@ public class ApplicationContainerFinishe
 
   public ApplicationContainerFinishedEvent(
       ContainerId containerID) {
-    super(containerID.getAppId(),
+    super(containerID.getApplicationAttemptId().getApplicationId(),
         ApplicationEventType.APPLICATION_CONTAINER_FINISHED);
     this.containerID = containerID;
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java Tue Sep 13 22:49:27 2011
@@ -25,7 +25,7 @@ public class ApplicationInitEvent extend
   private final Container container;
 
   public ApplicationInitEvent(Container container) {
-    super(container.getContainerID().getAppId(),
+    super(container.getContainerID().getApplicationAttemptId().getApplicationId(),
         ApplicationEventType.INIT_APPLICATION);
     this.container = container;
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java Tue Sep 13 22:49:27 2011
@@ -40,8 +40,6 @@ public interface Container extends Event
 
   Map<Path,String> getLocalizedResources();
 
-  org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer();
-
   ContainerStatus cloneAndGetContainerStatus();
 
   String toString();

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Sep 13 22:49:27 2011
@@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.no
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -38,6 +40,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -50,9 +53,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
@@ -72,7 +74,7 @@ public class ContainerImpl implements Co
   private final Credentials credentials;
   private final NodeManagerMetrics metrics;
   private final ContainerLaunchContext launchContext;
-  private String exitCode = "NA";
+  private int exitCode = YarnConfiguration.INVALID_CONTAINER_EXIT_STATUS;
   private final StringBuilder diagnostics;
 
   private static final Log LOG = LogFactory.getLog(Container.class);
@@ -81,6 +83,12 @@ public class ContainerImpl implements Co
     new HashMap<LocalResourceRequest,String>();
   private final Map<Path,String> localizedResources =
     new HashMap<Path,String>();
+  private final List<LocalResourceRequest> publicRsrcs =
+    new ArrayList<LocalResourceRequest>();
+  private final List<LocalResourceRequest> privateRsrcs =
+    new ArrayList<LocalResourceRequest>();
+  private final List<LocalResourceRequest> appRsrcs =
+    new ArrayList<LocalResourceRequest>();
 
   public ContainerImpl(Dispatcher dispatcher,
       ContainerLaunchContext launchContext, Credentials creds,
@@ -318,24 +326,6 @@ public class ContainerImpl implements Co
   }
 
   @Override
-  public
-      org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer() {
-    this.readLock.lock();
-    try {
-      org.apache.hadoop.yarn.api.records.Container c =
-        recordFactory.newRecordInstance(
-            org.apache.hadoop.yarn.api.records.Container.class);
-      c.setId(this.launchContext.getContainerId());
-      c.setResource(this.launchContext.getResource());
-      c.setState(getCurrentState());
-      c.setContainerStatus(cloneAndGetContainerStatus());
-      return c;
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
-  @Override
   public ContainerLaunchContext getLaunchContext() {
     this.readLock.lock();
     try {
@@ -354,14 +344,14 @@ public class ContainerImpl implements Co
       containerStatus.setState(getCurrentState());
       containerStatus.setContainerId(this.launchContext.getContainerId());
       containerStatus.setDiagnostics(diagnostics.toString());
-  	  containerStatus.setExitStatus(String.valueOf(exitCode));
+  	  containerStatus.setExitStatus(exitCode);
       return containerStatus;
     } finally {
       this.readLock.unlock();
     }
   }
 
-  @SuppressWarnings("fallthrough")
+  @SuppressWarnings({"fallthrough", "unchecked"})
   private void finished() {
     switch (getContainerState()) {
       case EXITED_WITH_SUCCESS:
@@ -369,7 +359,8 @@ public class ContainerImpl implements Co
         metrics.completedContainer();
         NMAuditLogger.logSuccess(getUser(),
             AuditConstants.FINISH_SUCCESS_CONTAINER, "ContainerImpl",
-            getContainerID().getAppId(), getContainerID());
+            getContainerID().getApplicationAttemptId().getApplicationId(), 
+            getContainerID());
         break;
       case EXITED_WITH_FAILURE:
         metrics.endRunningContainer();
@@ -379,7 +370,8 @@ public class ContainerImpl implements Co
         NMAuditLogger.logFailure(getUser(),
             AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
             "Container failed with state: " + getContainerState(),
-            getContainerID().getAppId(), getContainerID());
+            getContainerID().getApplicationAttemptId().getApplicationId(), 
+            getContainerID());
         break;
       case CONTAINER_CLEANEDUP_AFTER_KILL:
         metrics.endRunningContainer();
@@ -388,13 +380,15 @@ public class ContainerImpl implements Co
         metrics.killedContainer();
         NMAuditLogger.logSuccess(getUser(),
             AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
-            getContainerID().getAppId(), getContainerID());
+            getContainerID().getApplicationAttemptId().getApplicationId(), 
+            getContainerID());
     }
 
     metrics.releaseContainer(getLaunchContext().getResource());
 
     // Inform the application
     ContainerId containerID = getContainerID();
+    @SuppressWarnings("rawtypes")
     EventHandler eventHandler = dispatcher.getEventHandler();
     eventHandler.handle(new ApplicationContainerFinishedEvent(containerID));
     // Remove the container from the resource-monitor
@@ -404,6 +398,24 @@ public class ContainerImpl implements Co
         containerID, exitCode));
   }
 
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  public void cleanup() {
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
+      new HashMap<LocalResourceVisibility, 
+                  Collection<LocalResourceRequest>>();
+    if (!publicRsrcs.isEmpty()) {
+      rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
+    }
+    if (!privateRsrcs.isEmpty()) {
+      rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
+    }
+    if (!appRsrcs.isEmpty()) {
+      rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
+    }
+    dispatcher.getEventHandler().handle(
+        new ContainerLocalizationCleanupEvent(this, rsrc));
+  }
+
   static class ContainerTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
 
@@ -424,27 +436,22 @@ public class ContainerImpl implements Co
       container.metrics.initingContainer();
 
       // Inform the AuxServices about the opaque serviceData
-      Map<String,ByteBuffer> csd = ctxt.getAllServiceData();
+      Map<String,ByteBuffer> csd = ctxt.getServiceData();
       if (csd != null) {
         // This can happen more than once per Application as each container may
         // have distinct service data
         for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
           container.dispatcher.getEventHandler().handle(
               new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
-                ctxt.getUser(), ctxt.getContainerId().getAppId(),
+                ctxt.getUser(), 
+                ctxt.getContainerId().getApplicationAttemptId().getApplicationId(),
                 service.getKey().toString(), service.getValue()));
         }
       }
 
       // Send requests for public, private resources
-      Map<String,LocalResource> cntrRsrc = ctxt.getAllLocalResources();
+      Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
       if (!cntrRsrc.isEmpty()) {
-        ArrayList<LocalResourceRequest> publicRsrc =
-          new ArrayList<LocalResourceRequest>();
-        ArrayList<LocalResourceRequest> privateRsrc =
-          new ArrayList<LocalResourceRequest>();
-        ArrayList<LocalResourceRequest> appRsrc =
-          new ArrayList<LocalResourceRequest>();
         try {
           for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
             try {
@@ -453,13 +460,13 @@ public class ContainerImpl implements Co
             container.pendingResources.put(req, rsrc.getKey());
             switch (rsrc.getValue().getVisibility()) {
             case PUBLIC:
-              publicRsrc.add(req);
+              container.publicRsrcs.add(req);
               break;
             case PRIVATE:
-              privateRsrc.add(req);
+              container.privateRsrcs.add(req);
               break;
             case APPLICATION:
-              appRsrc.add(req);
+              container.appRsrcs.add(req);
               break;
             }
             } catch (URISyntaxException e) {
@@ -471,27 +478,25 @@ public class ContainerImpl implements Co
         } catch (URISyntaxException e) {
           // malformed resource; abort container launch
           LOG.warn("Failed to parse resource-request", e);
-          container.dispatcher.getEventHandler().handle(
-              new ContainerLocalizationEvent(
-               LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+          container.cleanup();
           container.metrics.endInitingContainer();
           return ContainerState.LOCALIZATION_FAILED;
         }
-        if (!publicRsrc.isEmpty()) {
-          container.dispatcher.getEventHandler().handle(
-              new ContainerLocalizationRequestEvent(
-                container, publicRsrc, LocalResourceVisibility.PUBLIC));
+        Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+            new HashMap<LocalResourceVisibility, 
+                        Collection<LocalResourceRequest>>();
+        if (!container.publicRsrcs.isEmpty()) {
+          req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
         }
-        if (!privateRsrc.isEmpty()) {
-          container.dispatcher.getEventHandler().handle(
-              new ContainerLocalizationRequestEvent(
-                container, privateRsrc, LocalResourceVisibility.PRIVATE));
+        if (!container.privateRsrcs.isEmpty()) {
+          req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
         }
-        if (!appRsrc.isEmpty()) {
-          container.dispatcher.getEventHandler().handle(
-              new ContainerLocalizationRequestEvent(
-                container, appRsrc, LocalResourceVisibility.APPLICATION));
+        if (!container.appRsrcs.isEmpty()) {
+          req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
         }
+        
+        container.dispatcher.getEventHandler().handle(
+              new ContainerLocalizationRequestEvent(container, req));
         return ContainerState.LOCALIZING;
       } else {
         container.dispatcher.getEventHandler().handle(
@@ -546,7 +551,6 @@ public class ContainerImpl implements Co
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class ExitedWithSuccessTransition extends ContainerTransition {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
@@ -554,31 +558,25 @@ public class ContainerImpl implements Co
 
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
-      container.dispatcher.getEventHandler().handle(
-          new ContainerLocalizationEvent(
-            LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+      container.cleanup();
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class ExitedWithFailureTransition extends ContainerTransition {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerExitEvent exitEvent = (ContainerExitEvent) event;
-      container.exitCode = String.valueOf(exitEvent.getExitCode());
+      container.exitCode = exitEvent.getExitCode();
 
       // TODO: Add containerWorkDir to the deletion service.
       // TODO: Add containerOuputDir to the deletion service.
 
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
-      container.dispatcher.getEventHandler().handle(
-          new ContainerLocalizationEvent(
-            LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+      container.cleanup();
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class ResourceFailedTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -592,30 +590,24 @@ public class ContainerImpl implements Co
 
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
-      container.dispatcher.getEventHandler().handle(
-          new ContainerLocalizationEvent(
-            LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+      container.cleanup();
       container.metrics.endInitingContainer();
     }
   }
   
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class KillDuringLocalizationTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
-      container.dispatcher.getEventHandler().handle(
-          new ContainerLocalizationEvent(
-            LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+      container.cleanup();
       container.metrics.endInitingContainer();
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
       container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class LocalizedResourceDuringKillTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -647,23 +639,19 @@ public class ContainerImpl implements Co
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class ContainerKilledTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerExitEvent exitEvent = (ContainerExitEvent) event;
-      container.exitCode = String.valueOf(exitEvent.getExitCode());
+      container.exitCode = exitEvent.getExitCode();
 
       // The process/process-grp is killed. Decrement reference counts and
       // cleanup resources
-      container.dispatcher.getEventHandler().handle(
-          new ContainerLocalizationEvent(
-            LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+      container.cleanup();
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class ContainerDoneTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -697,7 +685,8 @@ public class ContainerImpl implements Co
         newState =
             stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.warn("Can't handle this event at current state", e);
+        LOG.warn("Can't handle this event at current state: Current: ["
+            + oldState + "], eventType: [" + event.getType() + "]", e);
       }
       if (oldState != newState) {
         LOG.info("Container " + containerID + " transitioned from "

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Tue Sep 13 22:49:27 2011
@@ -45,10 +45,10 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
@@ -79,7 +79,7 @@ public class ContainerLaunch implements 
     this.exec = exec;
     this.container = container;
     this.dispatcher = dispatcher;
-    this.logDirsSelector = new LocalDirAllocator(NMConfig.NM_LOG_DIR);
+    this.logDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
   }
 
   @Override
@@ -89,8 +89,8 @@ public class ContainerLaunch implements 
     final Map<Path,String> localResources = container.getLocalizedResources();
     String containerIdStr = ConverterUtils.toString(container.getContainerID());
     final String user = launchContext.getUser();
-    final Map<String,String> env = launchContext.getAllEnv();
-    final List<String> command = launchContext.getCommandList();
+    final Map<String,String> env = launchContext.getEnv();
+    final List<String> command = launchContext.getCommands();
     int ret = -1;
 
     try {
@@ -107,10 +107,9 @@ public class ContainerLaunch implements 
         newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
             containerLogDir.toUri().getPath()));
       }
-      launchContext.clearCommands();
-      launchContext.addAllCommands(newCmds);
+      launchContext.setCommands(newCmds);
 
-      Map<String, String> envs = launchContext.getAllEnv();
+      Map<String, String> envs = launchContext.getEnv();
       Map<String, String> newEnvs = new HashMap<String, String>(envs.size());
       for (Entry<String, String> entry : envs.entrySet()) {
         newEnvs.put(
@@ -119,13 +118,12 @@ public class ContainerLaunch implements 
                 ApplicationConstants.LOG_DIR_EXPANSION_VAR,
                 containerLogDir.toUri().getPath()));
       }
-      launchContext.clearEnv();
-      launchContext.addAllEnv(newEnvs);
+      launchContext.setEnv(newEnvs);
       // /////////////////////////// End of variable expansion
 
       FileContext lfs = FileContext.getLocalFSFileContext();
       LocalDirAllocator lDirAllocator =
-          new LocalDirAllocator(NMConfig.NM_LOCAL_DIR); // TODO
+          new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO
       Path nmPrivateContainerScriptPath =
           lDirAllocator.getLocalPathForWrite(
               ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
@@ -152,8 +150,8 @@ public class ContainerLaunch implements 
       try {
         // /////////// Write out the container-script in the nmPrivate space.
         String[] localDirs =
-            this.conf.getStrings(NMConfig.NM_LOCAL_DIR,
-                NMConfig.DEFAULT_NM_LOCAL_DIR);
+            this.conf.getStrings(YarnConfiguration.NM_LOCAL_DIRS,
+                YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
         List<Path> appDirs = new ArrayList<Path>(localDirs.length);
         for (String localDir : localDirs) {
           Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
@@ -170,7 +168,7 @@ public class ContainerLaunch implements 
             containerWorkDir, FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
 
         writeLaunchEnv(containerScriptOutStream, env, localResources,
-            launchContext.getCommandList(), appDirs);
+            launchContext.getCommands(), appDirs);
         // /////////// End of writing out container-script
 
         // /////////// Write out the container-tokens in the nmPrivate space.

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Tue Sep 13 22:49:27 2011
@@ -103,7 +103,8 @@ public class ContainersLauncher extends 
     switch (event.getType()) {
       case LAUNCH_CONTAINER:
         Application app =
-          context.getApplications().get(containerId.getAppId());
+          context.getApplications().get(
+              containerId.getApplicationAttemptId().getApplicationId());
       ContainerLaunch launch =
           new ContainerLaunch(getConfig(), dispatcher, exec, app,
               event.getContainer());

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Tue Sep 13 22:49:27 2011
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -274,7 +273,7 @@ public class ContainerLocalizer {
           stat.setLocalPath(
               ConverterUtils.getYarnUrlFromPath(localPath));
           stat.setLocalSize(
-              FileUtil.getDU(new File(localPath.getParent().toString())));
+              FileUtil.getDU(new File(localPath.getParent().toUri())));
           stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
         } catch (ExecutionException e) {
           stat.setStatus(ResourceStatusType.FETCH_FAILURE);

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Tue Sep 13 22:49:27 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.nod
  * {@link LocalResourceVisibility}.
  * 
  */
+
 class LocalResourcesTrackerImpl implements LocalResourcesTracker {
 
   static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
@@ -83,7 +84,7 @@ class LocalResourcesTrackerImpl implemen
   @Override
   public boolean remove(LocalizedResource rem, DeletionService delService) {
     // current synchronization guaranteed by crude RLS event for cleanup
-    LocalizedResource rsrc = localrsrc.remove(rem.getRequest());
+    LocalizedResource rsrc = localrsrc.get(rem.getRequest());
     if (null == rsrc) {
       LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
           " from " + getUser());
@@ -93,10 +94,11 @@ class LocalResourcesTrackerImpl implemen
         || ResourceState.DOWNLOADING.equals(rsrc.getState())
         || rsrc != rem) {
       // internal error
-      LOG.error("Attempt to remove resource with non-zero refcount");
+      LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount");
       assert false;
       return false;
     }
+    localrsrc.remove(rem.getRequest());
     if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
       delService.delete(getUser(), rsrc.getLocalPath());
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Tue Sep 13 22:49:27 2011
@@ -120,7 +120,8 @@ public class LocalizedResource implement
     for (ContainerId c : ref) {
       sb.append("(").append(c.toString()).append(")");
     }
-    sb.append("],").append(getTimestamp()).append("}");
+    sb.append("],").append(getTimestamp()).append(",")
+      .append(getState()).append("}");
     return sb.toString();
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Sep 13 22:49:27 2011
@@ -22,6 +22,7 @@ import java.io.File;
 
 import java.net.URISyntaxException;
 
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -43,6 +44,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -50,18 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnC
 
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_MAX_PUBLIC_FETCH_THREADS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_CACHE_CLEANUP_MS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCALIZER_BIND_ADDRESS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOG_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_TARGET_CACHE_MB;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_CACHE_CLEANUP_MS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCALIZER_BIND_ADDRESS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOG_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_MAX_PUBLIC_FETCH_THREADS;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_TARGET_CACHE_MB;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -75,7 +65,6 @@ import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -91,7 +80,6 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@@ -106,7 +94,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -114,6 +102,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
@@ -154,7 +143,7 @@ public class ResourceLocalizationService
     this.exec = exec;
     this.dispatcher = dispatcher;
     this.delService = delService;
-    this.localDirsSelector = new LocalDirAllocator(NMConfig.NM_LOCAL_DIR);
+    this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
     this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
     this.cacheCleanup = new ScheduledThreadPoolExecutor(1);
   }
@@ -174,7 +163,7 @@ public class ResourceLocalizationService
       // TODO queue deletions here, rather than NM init?
       FileContext lfs = getLocalFileContext(conf);
       String[] sLocalDirs =
-        conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR);
+        conf.getStrings(YarnConfiguration.NM_LOCAL_DIRS, YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
 
       localDirs = new ArrayList<Path>(sLocalDirs.length);
       logDirs = new ArrayList<Path>(sLocalDirs.length);
@@ -193,7 +182,7 @@ public class ResourceLocalizationService
         lfs.mkdir(sysdir, NM_PRIVATE_PERM, true);
         sysDirs.add(sysdir);
       }
-      String[] sLogdirs = conf.getStrings(NM_LOG_DIR, DEFAULT_NM_LOG_DIR);
+      String[] sLogdirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
       for (String sLogdir : sLogdirs) {
         Path logdir = new Path(sLogdir);
         logDirs.add(logdir);
@@ -206,12 +195,12 @@ public class ResourceLocalizationService
     logDirs = Collections.unmodifiableList(logDirs);
     sysDirs = Collections.unmodifiableList(sysDirs);
     cacheTargetSize =
-      conf.getLong(NM_TARGET_CACHE_MB, DEFAULT_NM_TARGET_CACHE_MB) << 20;
+      conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
     cacheCleanupPeriod =
-      conf.getLong(NM_CACHE_CLEANUP_MS, DEFAULT_NM_CACHE_CLEANUP_MS);
+      conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
     localizationServerAddress = NetUtils.createSocketAddr(
-      conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS));
-    localizerTracker = new LocalizerTracker(conf);
+      conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
+    localizerTracker = createLocalizerTracker(conf);
     dispatcher.register(LocalizerEventType.class, localizerTracker);
     cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
         cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
@@ -231,6 +220,10 @@ public class ResourceLocalizationService
     super.start();
   }
 
+  LocalizerTracker createLocalizerTracker(Configuration conf) {
+    return new LocalizerTracker(conf);
+  }
+
   Server createServer() {
     YarnRPC rpc = YarnRPC.create(getConfig());
     Configuration conf = new Configuration(getConfig()); // Clone to separate
@@ -244,8 +237,8 @@ public class ResourceLocalizationService
     
     return rpc.getServer(LocalizationProtocol.class, this,
         localizationServerAddress, conf, secretManager, 
-        conf.getInt(NMConfig.NM_LOCALIZATION_THREADS, 
-            NMConfig.DEFAULT_NM_LOCALIZATION_THREADS));
+        conf.getInt(YarnConfiguration.NM_LOCALIZER_CLIENT_THREAD_COUNT, 
+            YarnConfiguration.DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT));
 
   }
 
@@ -265,6 +258,9 @@ public class ResourceLocalizationService
   public void handle(LocalizationEvent event) {
     String userName;
     String appIDStr;
+    Container c;
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs;
+    LocalResourcesTracker tracker;
     // TODO: create log dir as $logdir/$user/$appId
     switch (event.getType()) {
     case INIT_APPLICATION_RESOURCES:
@@ -289,28 +285,17 @@ public class ResourceLocalizationService
     case INIT_CONTAINER_RESOURCES:
       ContainerLocalizationRequestEvent rsrcReqs =
         (ContainerLocalizationRequestEvent) event;
-      Container c = rsrcReqs.getContainer();
+      c = rsrcReqs.getContainer();
       LocalizerContext ctxt = new LocalizerContext(
           c.getUser(), c.getContainerID(), c.getCredentials());
-      final LocalResourcesTracker tracker;
-      LocalResourceVisibility vis = rsrcReqs.getVisibility();
-      switch (vis) {
-      default:
-      case PUBLIC:
-        tracker = publicRsrc;
-        break;
-      case PRIVATE:
-        tracker = privateRsrc.get(c.getUser());
-        break;
-      case APPLICATION:
-        tracker =
-          appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId()));
-        break;
-      }
-      // We get separate events one each for all resources of one visibility. So
-      // all the resources in this event are of the same visibility.
-      for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
-        tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
+      rsrcs = rsrcReqs.getRequestedResources();
+      for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
+           rsrcs.entrySet()) {
+        tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), 
+            c.getContainerID().getApplicationAttemptId().getApplicationId());
+        for (LocalResourceRequest req : e.getValue()) {
+          tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
+        }
       }
       break;
     case CACHE_CLEANUP:
@@ -324,14 +309,25 @@ public class ResourceLocalizationService
       }
       break;
     case CLEANUP_CONTAINER_RESOURCES:
-      Container container =
-        ((ContainerLocalizationEvent)event).getContainer();
+      ContainerLocalizationCleanupEvent rsrcCleanup =
+        (ContainerLocalizationCleanupEvent) event;
+      c = rsrcCleanup.getContainer();
+      rsrcs = rsrcCleanup.getResources();
+      for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
+           rsrcs.entrySet()) {
+        tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), 
+            c.getContainerID().getApplicationAttemptId().getApplicationId());
+        for (LocalResourceRequest req : e.getValue()) {
+          tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
+        }
+      }
 
       // Delete the container directories
-      userName = container.getUser();
-      String containerIDStr = container.toString();
+      userName = c.getUser();
+      String containerIDStr = c.toString();
       appIDStr =
-        ConverterUtils.toString(container.getContainerID().getAppId());
+        ConverterUtils.toString(
+            c.getContainerID().getApplicationAttemptId().getApplicationId());
       for (Path localDir : localDirs) {
 
         // Delete the user-owned container-dir
@@ -349,8 +345,7 @@ public class ResourceLocalizationService
         delService.delete(null, containerSysDir,  new Path[] {});
       }
 
-      dispatcher.getEventHandler().handle(new ContainerEvent(
-            container.getContainerID(),
+      dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
             ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
       break;
     case DESTROY_APPLICATION_RESOURCES:
@@ -392,6 +387,19 @@ public class ResourceLocalizationService
     }
   }
 
+  LocalResourcesTracker getLocalResourcesTracker(
+      LocalResourceVisibility visibility, String user, ApplicationId appId) {
+    switch (visibility) {
+      default:
+      case PUBLIC:
+        return publicRsrc;
+      case PRIVATE:
+        return privateRsrc.get(user);
+      case APPLICATION:
+        return appRsrc.get(ConverterUtils.toString(appId));
+    }
+  }
+
   /**
    * Sub-component handling the spawning of {@link ContainerLocalizer}s
    */
@@ -496,7 +504,7 @@ public class ResourceLocalizationService
     PublicLocalizer(Configuration conf) {
       this(conf, getLocalFileContext(conf),
            Executors.newFixedThreadPool(conf.getInt(
-               NM_MAX_PUBLIC_FETCH_THREADS, DEFAULT_MAX_PUBLIC_FETCH_THREADS)),
+               YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT)),
            new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
            new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
     }
@@ -539,6 +547,7 @@ public class ResourceLocalizationService
     }
 
     @Override
+    @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
       try {
         // TODO shutdown, better error handling esp. DU
@@ -664,6 +673,7 @@ public class ResourceLocalizationService
     }
 
     // TODO this sucks. Fix it later
+    @SuppressWarnings("unchecked") // dispatcher not typed
     LocalizerHeartbeatResponse update(
         List<LocalResourceStatus> remoteResourceStatuses) {
       LocalizerHeartbeatResponse response =
@@ -780,7 +790,9 @@ public class ResourceLocalizationService
         // 2) exec initApplication and wait
         exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
             context.getUser(),
-            ConverterUtils.toString(context.getContainerId().getAppId()),
+            ConverterUtils.toString(
+                context.getContainerId().
+                    getApplicationAttemptId().getApplicationId()),
             localizerId, localDirs);
       // TODO handle ExitCodeException separately?
       } catch (Exception e) {
@@ -808,6 +820,7 @@ public class ResourceLocalizationService
     }
 
     @Override
+    @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
       dispatcher.getEventHandler().handle(
           new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP));

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java Tue Sep 13 22:49:27 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
 
 import java.util.Collection;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -26,27 +27,23 @@ import org.apache.hadoop.yarn.server.nod
 public class ContainerLocalizationRequestEvent extends
     ContainerLocalizationEvent {
 
-  private final LocalResourceVisibility vis;
-  private final Collection<LocalResourceRequest> reqs;
+  private final Map<LocalResourceVisibility, Collection<LocalResourceRequest>> 
+    rsrc;
 
   /**
-   * Event requesting the localization of the reqs all with visibility vis
+   * Event requesting the localization of the rsrc.
    * @param c
-   * @param reqs
-   * @param vis
+   * @param rsrc
    */
   public ContainerLocalizationRequestEvent(Container c,
-      Collection<LocalResourceRequest> reqs, LocalResourceVisibility vis) {
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
     super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
-    this.vis = vis;
-    this.reqs = reqs;
+    this.rsrc = rsrc;
   }
 
-  public LocalResourceVisibility getVisibility() {
-    return vis;
+  public
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+      getRequestedResources() {
+    return rsrc;
   }
-
-  public Collection<LocalResourceRequest> getRequestedResources() {
-    return reqs;
-  }
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java Tue Sep 13 22:49:27 2011
@@ -17,8 +17,6 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
 
-import java.net.URISyntaxException;
-
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 
@@ -26,8 +24,8 @@ public class ResourceReleaseEvent extend
 
   private final ContainerId container;
 
-  public ResourceReleaseEvent(LocalResourceRequest rsrc, ContainerId container)
-      throws URISyntaxException {
+  public ResourceReleaseEvent(LocalResourceRequest rsrc, 
+      ContainerId container) {
     super(rsrc, ResourceEventType.RELEASE);
     this.container = container;
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java Tue Sep 13 22:49:27 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.file.tfile.TFile;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class AggregatedLogFormat {
@@ -89,8 +90,11 @@ public class AggregatedLogFormat {
     public void write(DataOutputStream out) throws IOException {
       for (String rootLogDir : this.rootLogDirs) {
         File appLogDir =
-            new File(rootLogDir, ConverterUtils.toString(this.containerId
-                .getAppId()));
+            new File(rootLogDir, 
+                ConverterUtils.toString(
+                    this.containerId.getApplicationAttemptId().
+                        getApplicationId())
+                );
         File containerLogDir =
             new File(appLogDir, ConverterUtils.toString(this.containerId));
 
@@ -148,8 +152,8 @@ public class AggregatedLogFormat {
       // 256KB minBlockSize : Expected log size for each container too
       this.writer =
           new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
-              LogAggregationService.LOG_COMPRESSION_TYPE,
-              LogAggregationService.DEFAULT_COMPRESSION_TYPE), null, conf);
+              YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
+              YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
     }
 
     public void append(LogKey logKey, LogValue logValue) throws IOException {