You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/05/19 19:04:11 UTC

svn commit: r1124996 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ yarn/ya...

Author: vinodkv
Date: Thu May 19 17:04:11 2011
New Revision: 1124996

URL: http://svn.apache.org/viewvc?rev=1124996&view=rev
Log:
Distributed cache bug fix to pass Terasort. Contributed by Vinod Kumar Vavilapalli.

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Thu May 19 17:04:11 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    Distributed cache bug fix to pass Terasort. (vinodkv)
+
     Add junit jar to lib in assembly (mahadev and luke)
 
     MAPREDUCE-2509. Fix NPE in UI for pending attempts. (luke lu via mahadev) 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu May 19 17:04:11 2011
@@ -510,7 +510,7 @@ public abstract class TaskAttemptImpl im
       // //////////// End of JobConf setup
 
       // Setup DistributedCache
-      setupDistributedCache(conf, container);
+      setupDistributedCache(remoteFS, conf, container);
 
       // Setup up tokens
       Credentials taskCredentials = new Credentials();
@@ -586,11 +586,11 @@ public abstract class TaskAttemptImpl im
     return container;
   }
 
-  private void setupDistributedCache(Configuration conf, 
+  private void setupDistributedCache(FileContext remoteFS, Configuration conf, 
       ContainerLaunchContext container) throws IOException {
     
     // Cache archives
-    parseDistributedCacheArtifacts(container, LocalResourceType.ARCHIVE, 
+    parseDistributedCacheArtifacts(remoteFS, container, LocalResourceType.ARCHIVE, 
         DistributedCache.getCacheArchives(conf), 
         DistributedCache.getArchiveTimestamps(conf), 
         getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
@@ -598,7 +598,7 @@ public abstract class TaskAttemptImpl im
         DistributedCache.getArchiveClassPaths(conf));
     
     // Cache files
-    parseDistributedCacheArtifacts(container, LocalResourceType.FILE, 
+    parseDistributedCacheArtifacts(remoteFS, container, LocalResourceType.FILE, 
         DistributedCache.getCacheFiles(conf),
         DistributedCache.getFileTimestamps(conf),
         getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
@@ -609,8 +609,8 @@ public abstract class TaskAttemptImpl im
   // TODO - Move this to MR!
   // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 
   // long[], boolean[], Path[], FileType)
-  private static void parseDistributedCacheArtifacts(
-      ContainerLaunchContext container, LocalResourceType type,
+  private void parseDistributedCacheArtifacts(
+      FileContext remoteFS, ContainerLaunchContext container, LocalResourceType type,
       URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
       Path[] classpaths) throws IOException {
 
@@ -634,7 +634,11 @@ public abstract class TaskAttemptImpl im
       }
       for (int i = 0; i < uris.length; ++i) {
         URI u = uris[i];
-        Path p = new Path(u.toString());
+        Path p = new Path(u);
+        if (!p.isAbsolute()) {
+          p = p.makeQualified(remoteFS.getDefaultFileSystem()
+              .getUri(), remoteFS.getWorkingDirectory());
+        }
         // Add URI fragment or just the filename
         Path name = new Path((null == u.getFragment())
           ? p.getName()
@@ -645,7 +649,7 @@ public abstract class TaskAttemptImpl im
         container.setLocalResource(
             name.toUri().getPath(),
             BuilderUtils.newLocalResource(recordFactory,
-                uris[i], type, 
+                p.toUri(), type, 
                 visibilities[i]
                   ? LocalResourceVisibility.PUBLIC
                   : LocalResourceVisibility.PRIVATE,

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu May 19 17:04:11 2011
@@ -82,7 +82,7 @@ public class ResourceMgrDelegate {
   private ApplicationId applicationId;
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
-  public ResourceMgrDelegate(Configuration conf) throws UnsupportedFileSystemException {
+  public ResourceMgrDelegate(Configuration conf) {
     this.conf = conf;
     YarnRPC rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress =

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu May 19 17:04:11 2011
@@ -95,19 +95,20 @@ public class YARNRunner implements Clien
   private ResourceMgrDelegate resMgrDelegate;
   private ClientServiceDelegate clientServiceDelegate;
   private YarnConfiguration conf;
+  private final FileContext defaultFileContext;
 
   /**
    * Yarn runner incapsulates the client interface of
    * yarn
    * @param conf the configuration object for the client
    */
-  public YARNRunner(Configuration conf)
-      throws YarnRemoteException {
+  public YARNRunner(Configuration conf) {
     this.conf = new YarnConfiguration(conf);
     try {
       this.resMgrDelegate = new ResourceMgrDelegate(conf);
       this.clientServiceDelegate = new ClientServiceDelegate(conf,
           resMgrDelegate);
+      this.defaultFileContext = FileContext.getFileContext(conf);
     } catch (UnsupportedFileSystemException ufe) {
       throw new RuntimeException("Error in instantiating YarnClient", ufe);
     }
@@ -269,26 +270,23 @@ public class YARNRunner implements Clien
     LOG.info("AppMaster capability = " + capability);
     appContext.setMasterCapability(capability);
 
-    FileContext defaultFS = FileContext.getFileContext(conf);
     Path jobConfPath =
         new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE);
     
     URL yarnUrlForJobSubmitDir =
-        ConverterUtils.getYarnUrlFromPath(defaultFS.makeQualified(new Path(
+        ConverterUtils.getYarnUrlFromPath(defaultFileContext.makeQualified(new Path(
             jobSubmitDir)));
-//    appContext.resources = new HashMap<CharSequence, URL>();
     LOG.debug("Creating setup context, jobSubmitDir url is "
         + yarnUrlForJobSubmitDir);
 
     appContext.setResource(YARNApplicationConstants.JOB_SUBMIT_DIR,
         yarnUrlForJobSubmitDir);
 
-//    appContext.resources_todo = new HashMap<CharSequence,LocalResource>();
     appContext.setResourceTodo(YARNApplicationConstants.JOB_CONF_FILE,
-        createApplicationResource(defaultFS,
+        createApplicationResource(defaultFileContext,
             jobConfPath));
     appContext.setResourceTodo(YARNApplicationConstants.JOB_JAR,
-          createApplicationResource(defaultFS,
+          createApplicationResource(defaultFileContext,
             new Path(jobSubmitDir, YARNApplicationConstants.JOB_JAR)));
     
     // TODO gross hack
@@ -296,7 +294,7 @@ public class YARNRunner implements Clien
         YarnConfiguration.APPLICATION_TOKENS_FILE }) {
       appContext.setResourceTodo(
           YARNApplicationConstants.JOB_SUBMIT_DIR + "/" + s,
-          createApplicationResource(defaultFS, new Path(jobSubmitDir, s)));
+          createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s)));
     }
 
     // TODO: Only if security is on.
@@ -385,7 +383,7 @@ public class YARNRunner implements Clien
 
   // TODO - Move this to MR!
   // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType)
-  private static void parseDistributedCacheArtifacts(
+  private void parseDistributedCacheArtifacts(
       ApplicationSubmissionContext container, LocalResourceType type,
       URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
       Path[] classpaths) throws IOException {
@@ -410,7 +408,11 @@ public class YARNRunner implements Clien
       }
       for (int i = 0; i < uris.length; ++i) {
         URI u = uris[i];
-        Path p = new Path(u.toString());
+        Path p = new Path(u);
+        if (!p.isAbsolute()) {
+          p = p.makeQualified(this.defaultFileContext.getDefaultFileSystem()
+              .getUri(), this.defaultFileContext.getWorkingDirectory());
+        }
         // Add URI fragment or just the filename
         Path name = new Path((null == u.getFragment())
           ? p.getName()
@@ -420,8 +422,8 @@ public class YARNRunner implements Clien
         }
         container.setResourceTodo(
             name.toUri().getPath(),
-            getLocalResource(
-                uris[i], type, 
+            createLocalResource(
+                p.toUri(), type, 
                 visibilities[i]
                   ? LocalResourceVisibility.PUBLIC
                   : LocalResourceVisibility.PRIVATE,
@@ -449,7 +451,7 @@ public class YARNRunner implements Clien
     return result;
   }
   
-  private static LocalResource getLocalResource(URI uri, 
+  private static LocalResource createLocalResource(URI uri, 
       LocalResourceType type, LocalResourceVisibility visibility, 
       long size, long timestamp) {
     LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java Thu May 19 17:04:11 2011
@@ -58,8 +58,7 @@ public class ConverterUtils {
     String authority = url.getHost() != null ? url.getHost() + ":" + url.getPort()
         : "";
     return new Path(
-        (new URI(scheme, authority, url.getFile(), null, null))
-            .normalize());
+        (new URI(scheme, authority, url.getFile(), null, null)).normalize());
   }
   
   /**

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Thu May 19 17:04:11 2011
@@ -234,7 +234,11 @@ public class ContainerManagerImpl extend
   @Override
   public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException {
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
-  
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(" container is " + request);
+    }
+
     // parse credentials
     ByteBuffer tokens = launchContext.getContainerTokens();
     Credentials credentials = new Credentials();

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Thu May 19 17:04:11 2011
@@ -24,6 +24,9 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -59,6 +62,8 @@ import org.apache.hadoop.yarn.util.Conve
 
 public class ContainerImpl implements Container {
 
+  private final Lock readLock;
+  private final Lock writeLock;
   private final Dispatcher dispatcher;
   private final Credentials credentials;
   private final ContainerLaunchContext launchContext;
@@ -79,6 +84,10 @@ public class ContainerImpl implements Co
     this.diagnostics = new StringBuilder();
     this.credentials = creds;
 
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
     stateMachine = stateMachineFactory.make(this);
   }
 
@@ -95,7 +104,8 @@ public class ContainerImpl implements Co
       new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
     // From NEW State
     .addTransition(ContainerState.NEW,
-        EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED),
+        EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED,
+            ContainerState.LOCALIZATION_FAILED),
         ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
     .addTransition(ContainerState.NEW, ContainerState.NEW,
         ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@@ -107,7 +117,8 @@ public class ContainerImpl implements Co
     .addTransition(ContainerState.LOCALIZING,
         EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED),
         ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
-    .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
+    .addTransition(ContainerState.LOCALIZING,
+        ContainerState.LOCALIZATION_FAILED,
         ContainerEventType.RESOURCE_FAILED,
         new KillDuringLocalizationTransition()) // TODO update diagnostics
     .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
@@ -117,6 +128,12 @@ public class ContainerImpl implements Co
         ContainerEventType.KILL_CONTAINER,
         new KillDuringLocalizationTransition())
 
+    // From LOCALIZATION_FAILED State
+    .addTransition(ContainerState.LOCALIZATION_FAILED,
+        ContainerState.DONE,
+        ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
+        CONTAINER_DONE_TRANSITION)
+
     // From LOCALIZED State
     .addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
         ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
@@ -211,10 +228,11 @@ public class ContainerImpl implements Co
   private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
     stateMachine;
 
-  private synchronized org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
+  private org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
     switch (stateMachine.getCurrentState()) {
     case NEW:
     case LOCALIZING:
+    case LOCALIZATION_FAILED:
     case LOCALIZED:
       return org.apache.hadoop.yarn.api.records.ContainerState.INITIALIZING;
     case RUNNING:
@@ -232,55 +250,96 @@ public class ContainerImpl implements Co
 
   @Override
   public ContainerId getContainerID() {
-    return this.launchContext.getContainerId();
+    this.readLock.lock();
+    try {
+      return this.launchContext.getContainerId();
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   @Override
   public String getUser() {
-    return this.launchContext.getUser();
+    this.readLock.lock();
+    try {
+      return this.launchContext.getUser();
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   @Override
   public Map<Path,String> getLocalizedResources() {
-    assert ContainerState.LOCALIZED == getContainerState();
+    this.readLock.lock();
+    try {
+    assert ContainerState.LOCALIZED == getContainerState(); // TODO: FIXME!!
     return localizedResources;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   @Override
   public Credentials getCredentials() {
-    return credentials;
+    this.readLock.lock();
+    try {
+      return credentials;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   @Override
   public ContainerState getContainerState() {
-    return stateMachine.getCurrentState();
+    this.readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   @Override
-  public synchronized
+  public
       org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer() {
-    org.apache.hadoop.yarn.api.records.Container c =
-      recordFactory.newRecordInstance(
-          org.apache.hadoop.yarn.api.records.Container.class);
-    c.setId(this.launchContext.getContainerId());
-    c.setResource(this.launchContext.getResource());
-    c.setState(getCurrentState());
-    return c;
+    this.readLock.lock();
+    try {
+      org.apache.hadoop.yarn.api.records.Container c =
+        recordFactory.newRecordInstance(
+            org.apache.hadoop.yarn.api.records.Container.class);
+      c.setId(this.launchContext.getContainerId());
+      c.setResource(this.launchContext.getResource());
+      c.setState(getCurrentState());
+      return c;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   @Override
   public ContainerLaunchContext getLaunchContext() {
-    return launchContext;
+    this.readLock.lock();
+    try {
+      return launchContext;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   @Override
   public ContainerStatus cloneAndGetContainerStatus() {
-    ContainerStatus containerStatus = recordFactory.newRecordInstance(ContainerStatus.class);
-    containerStatus.setState(getCurrentState());
-    containerStatus.setContainerId(this.launchContext.getContainerId());
-    containerStatus.setDiagnostics(diagnostics.toString());
-	  containerStatus.setExitStatus(String.valueOf(exitCode));
-    return containerStatus;
+    this.readLock.lock();
+    try {
+      ContainerStatus containerStatus =
+          recordFactory.newRecordInstance(ContainerStatus.class);
+      containerStatus.setState(getCurrentState());
+      containerStatus.setContainerId(this.launchContext.getContainerId());
+      containerStatus.setDiagnostics(diagnostics.toString());
+  	  containerStatus.setExitStatus(String.valueOf(exitCode));
+      return containerStatus;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   static class ContainerTransition implements
@@ -304,8 +363,8 @@ public class ContainerImpl implements Co
       // Inform the AuxServices about the opaque serviceData
       Map<String,ByteBuffer> csd = ctxt.getAllServiceData();
       if (csd != null) {
-        // TODO: Isn't this supposed to happen only once per Application?
-        // ^ each container may have distinct service data
+        // This can happen more than once per Application as each container may
+        // have distinct service data
         for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
           container.dispatcher.getEventHandler().handle(
               new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
@@ -325,6 +384,7 @@ public class ContainerImpl implements Co
           new ArrayList<LocalResourceRequest>();
         try {
           for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
+            try {
             LocalResourceRequest req =
               new LocalResourceRequest(rsrc.getValue());
             container.pendingResources.put(req, rsrc.getKey());
@@ -339,13 +399,19 @@ public class ContainerImpl implements Co
               appRsrc.add(req);
               break;
             }
+            } catch (URISyntaxException e) {
+              LOG.info("Got exception parsing " + rsrc.getKey()
+                  + " and value " + rsrc.getValue());
+              throw e;
+            }
           }
         } catch (URISyntaxException e) {
           // malformed resource; abort container launch
+          LOG.warn("Failed to parse resource-request", e);
           container.dispatcher.getEventHandler().handle(
               new ContainerLocalizationEvent(
                LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
-          return ContainerState.LOCALIZING;
+          return ContainerState.LOCALIZATION_FAILED;
         }
         if (!publicRsrc.isEmpty()) {
           container.dispatcher.getEventHandler().handle(
@@ -516,28 +582,39 @@ public class ContainerImpl implements Co
   }
 
   @Override
-  public synchronized void handle(ContainerEvent event) {
+  public void handle(ContainerEvent event) {
+    try {
+      this.writeLock.lock();
 
-    ContainerId containerID = event.getContainerID();
-    LOG.info("Processing " + containerID + " of type " + event.getType());
+      ContainerId containerID = event.getContainerID();
+      LOG.info("Processing " + containerID + " of type " + event.getType());
 
-    ContainerState oldState = stateMachine.getCurrentState();
-    ContainerState newState = null;
-    try {
-      newState =
-          stateMachine.doTransition(event.getType(), event);
-    } catch (InvalidStateTransitonException e) {
-      LOG.warn("Can't handle this event at current state", e);
-    }
-    if (oldState != newState) {
-      LOG.info("Container " + containerID + " transitioned from " + oldState
-          + " to " + newState);
+      ContainerState oldState = stateMachine.getCurrentState();
+      ContainerState newState = null;
+      try {
+        newState =
+            stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.warn("Can't handle this event at current state", e);
+      }
+      if (oldState != newState) {
+        LOG.info("Container " + containerID + " transitioned from "
+            + oldState
+            + " to " + newState);
+      }
+    } finally {
+      this.writeLock.unlock();
     }
   }
 
   @Override
   public String toString() {
-    return ConverterUtils.toString(launchContext.getContainerId());
+    this.readLock.lock();
+    try {
+      return ConverterUtils.toString(launchContext.getContainerId());
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java Thu May 19 17:04:11 2011
@@ -19,7 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
 public enum ContainerState {
-  NEW, LOCALIZING, LOCALIZED, RUNNING, EXITED_WITH_SUCCESS,
+  NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, EXITED_WITH_SUCCESS,
   EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL,
   CONTAINER_RESOURCES_CLEANINGUP, DONE
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Thu May 19 17:04:11 2011
@@ -22,6 +22,9 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,6 +39,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
@@ -59,6 +63,9 @@ public class LocalizedResource implement
   final Semaphore sem = new Semaphore(1);
   final Queue<ContainerId> ref; // Queue of containers using this localized
                                 // resource
+  private final Lock readLock;
+  private final Lock writeLock;
+
   final AtomicLong timestamp = new AtomicLong(currentTime());
 
   private static final StateMachineFactory<LocalizedResource,ResourceState,
@@ -96,6 +103,11 @@ public class LocalizedResource implement
     this.rsrc = rsrc;
     this.dispatcher = dispatcher;
     this.ref = new LinkedList<ContainerId>();
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
     this.stateMachine = stateMachineFactory.make(this);
   }
 
@@ -112,21 +124,26 @@ public class LocalizedResource implement
     return sb.toString();
   }
 
-  public void release(ContainerId container) {
+  private void release(ContainerId container) {
     if (!ref.remove(container)) {
       LOG.info("Attempt to release claim on " + this +
                " from unregistered container " + container);
-      assert false;
+      assert false; // TODO: FIX
     }
     timestamp.set(currentTime());
   }
 
-  long currentTime() {
+  private long currentTime() {
     return System.nanoTime();
   }
 
   public ResourceState getState() {
-    return stateMachine.getCurrentState();
+    this.readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   public LocalResourceRequest getRequest() {
@@ -142,9 +159,28 @@ public class LocalizedResource implement
   }
 
   @Override
-  public synchronized void handle(ResourceEvent event) {
-    stateMachine.doTransition(event.getType(), event);
-    // TODO: Invalid transitions?
+  public void handle(ResourceEvent event) {
+    try {
+      this.writeLock.lock();
+
+      Path resourcePath = event.getLocalResourceRequest().getPath();
+      LOG.info("Processing " + resourcePath + " of type " + event.getType());
+
+      ResourceState oldState = this.stateMachine.getCurrentState();
+      ResourceState newState = null;
+      try {
+        newState = this.stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.warn("Can't handle this event at current state", e);
+      }
+      if (oldState != newState) {
+        LOG.info("Resource " + resourcePath + " transitioned from "
+            + oldState
+            + " to " + newState);
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   static abstract class ResourceTransition implements

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu May 19 17:04:11 2011
@@ -534,6 +534,8 @@ public class ResourceLocalizationService
             LOG.info("DEBUG: FAILED " + req, stat.getException());
             assoc.getResource().unlock();
             response.setLocalizerAction(LocalizerAction.DIE);
+            // TODO: Why is this event going directly to the container. Why not
+            // the resource itself? What happens to the resource? Is it removed?
             dispatcher.getEventHandler().handle(
                 new ContainerResourceFailedEvent(context.getContainerId(),
                   req, stat.getException()));