You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by at...@apache.org on 2013/04/13 01:05:38 UTC

svn commit: r1467511 [3/6] - in /hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Fri Apr 12 23:05:28 2013
@@ -21,17 +21,20 @@ import java.io.File;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+
 
 /**
  * A collection of {@link LocalizedResource}s all of same
@@ -49,30 +52,72 @@ class LocalResourcesTrackerImpl implemen
   private final String user;
   private final Dispatcher dispatcher;
   private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
+  private Configuration conf;
+  /*
+   * This flag controls whether this resource tracker uses hierarchical
+   * directories or not. For PRIVATE and PUBLIC resource trackers it
+   * will be set whereas for APPLICATION resource tracker it would
+   * be false.
+   */
+  private final boolean useLocalCacheDirectoryManager;
+  private ConcurrentHashMap<Path, LocalCacheDirectoryManager> directoryManagers;
+  /*
+   * It is used to keep track of resource into hierarchical directory
+   * while it is getting downloaded. It is useful for reference counting
+   * in case resource localization fails.
+   */
+  private ConcurrentHashMap<LocalResourceRequest, Path>
+    inProgressLocalResourcesMap;
+  /*
+   * starting with 10 to accommodate 0-9 directories created as a part of
+   * LocalCacheDirectoryManager. So there will be one unique number generator
+   * per APPLICATION, USER and PUBLIC cache.
+   */
+  private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
 
-  public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) {
+  public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+      boolean useLocalCacheDirectoryManager, Configuration conf) {
     this(user, dispatcher,
-        new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>());
+      new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
+      useLocalCacheDirectoryManager, conf);
   }
 
   LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
-      ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc) {
+      ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
+      boolean useLocalCacheDirectoryManager, Configuration conf) {
     this.user = user;
     this.dispatcher = dispatcher;
     this.localrsrc = localrsrc;
+    this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
+    if ( this.useLocalCacheDirectoryManager) {
+      directoryManagers = new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
+      inProgressLocalResourcesMap =
+        new ConcurrentHashMap<LocalResourceRequest, Path>();
+    }
+    this.conf = conf;
   }
 
+  /*
+   * Synchronizing this method for avoiding races due to multiple ResourceEvent's
+   * coming to LocalResourcesTracker from Public/Private localizer and
+   * Resource Localization Service.
+   */
   @Override
-  public void handle(ResourceEvent event) {
+  public synchronized void handle(ResourceEvent event) {
     LocalResourceRequest req = event.getLocalResourceRequest();
     LocalizedResource rsrc = localrsrc.get(req);
     switch (event.getType()) {
-    case REQUEST:
     case LOCALIZED:
+      if (useLocalCacheDirectoryManager) {
+        inProgressLocalResourcesMap.remove(req);
+      }
+      break;
+    case REQUEST:
       if (rsrc != null && (!isResourcePresent(rsrc))) {
         LOG.info("Resource " + rsrc.getLocalPath()
             + " is missing, localizing it again");
         localrsrc.remove(req);
+        decrementFileCountForLocalCacheDirectory(req, rsrc);
         rsrc = null;
       }
       if (null == rsrc) {
@@ -82,15 +127,74 @@ class LocalResourcesTrackerImpl implemen
       break;
     case RELEASE:
       if (null == rsrc) {
-        LOG.info("Release unknown rsrc null (discard)");
+        // The container sent a release event on a resource which 
+        // 1) Failed
+        // 2) Removed for some reason (ex. disk is no longer accessible)
+        ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
+        LOG.info("Container " + relEvent.getContainer()
+            + " sent RELEASE event on a resource request " + req
+            + " not present in cache.");
         return;
       }
       break;
+    case LOCALIZATION_FAILED:
+      decrementFileCountForLocalCacheDirectory(req, null);
+      /*
+       * If resource localization fails then Localized resource will be
+       * removed from local cache.
+       */
+      localrsrc.remove(req);
+      break;
     }
     rsrc.handle(event);
   }
 
-  /**
+  /*
+   * Update the file-count statistics for a local cache-directory.
+   * This will retrieve the localized path for the resource from
+   * 1) inProgressRsrcMap if the resource was under localization and it
+   * failed.
+   * 2) LocalizedResource if the resource is already localized.
+   * From this path it will identify the local directory under which the
+   * resource was localized. Then rest of the path will be used to decrement
+   * file count for the HierarchicalSubDirectory pointing to this relative
+   * path.
+   */
+  private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req,
+      LocalizedResource rsrc) {
+    if ( useLocalCacheDirectoryManager) {
+      Path rsrcPath = null;
+      if (inProgressLocalResourcesMap.containsKey(req)) {
+        // This happens when localization of a resource fails.
+        rsrcPath = inProgressLocalResourcesMap.remove(req);
+      } else if (rsrc != null && rsrc.getLocalPath() != null) {
+        rsrcPath = rsrc.getLocalPath().getParent().getParent();
+      }
+      if (rsrcPath != null) {
+        Path parentPath = new Path(rsrcPath.toUri().getRawPath());
+        while (!directoryManagers.containsKey(parentPath)) {
+          parentPath = parentPath.getParent();
+          if ( parentPath == null) {
+            return;
+          }
+        }
+        if ( parentPath != null) {
+          String parentDir = parentPath.toUri().getRawPath().toString();
+          LocalCacheDirectoryManager dir = directoryManagers.get(parentPath);
+          String rsrcDir = rsrcPath.toUri().getRawPath(); 
+          if (rsrcDir.equals(parentDir)) {
+            dir.decrementFileCountForPath("");
+          } else {
+            dir.decrementFileCountForPath(
+              rsrcDir.substring(
+              parentDir.length() + 1));
+          }
+        }
+      }
+    }
+  }
+
+/**
    * This module checks if the resource which was localized is already present
    * or not
    * 
@@ -100,7 +204,8 @@ class LocalResourcesTrackerImpl implemen
   public boolean isResourcePresent(LocalizedResource rsrc) {
     boolean ret = true;
     if (rsrc.getState() == ResourceState.LOCALIZED) {
-      File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+      File file = new File(rsrc.getLocalPath().toUri().getRawPath().
+        toString());
       if (!file.exists()) {
         ret = false;
       }
@@ -133,11 +238,11 @@ class LocalResourcesTrackerImpl implemen
       if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
         delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
       }
+      decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
       return true;
     }
   }
 
-
   /**
    * Returns the path up to the random directory component.
    */
@@ -163,4 +268,43 @@ class LocalResourcesTrackerImpl implemen
   public Iterator<LocalizedResource> iterator() {
     return localrsrc.values().iterator();
   }
-}
+
+  /**
+   * @return {@link Path} absolute path for localization which includes local
+   *         directory path and the relative hierarchical path (if use local
+   *         cache directory manager is enabled)
+   * 
+   * @param {@link LocalResourceRequest} Resource localization request to
+   *        localize the resource.
+   * @param {@link Path} local directory path
+   */
+  @Override
+  public Path
+      getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+    if (useLocalCacheDirectoryManager && localDirPath != null) {
+
+      if (!directoryManagers.containsKey(localDirPath)) {
+        directoryManagers.putIfAbsent(localDirPath,
+          new LocalCacheDirectoryManager(conf));
+      }
+      LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
+
+      Path rPath = localDirPath;
+      String hierarchicalPath = dir.getRelativePathForLocalization();
+      // For most of the scenarios we will get root path only which
+      // is an empty string
+      if (!hierarchicalPath.isEmpty()) {
+        rPath = new Path(localDirPath, hierarchicalPath);
+      }
+      inProgressLocalResourcesMap.put(req, rPath);
+      return rPath;
+    } else {
+      return localDirPath;
+    }
+  }
+
+  @Override
+  public long nextUniqueNumber() {
+    return uniqueNumberGenerator.incrementAndGet();
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Fri Apr 12 23:05:28 2013
@@ -32,10 +32,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
@@ -89,6 +91,8 @@ public class LocalizedResource implement
     .addTransition(ResourceState.DOWNLOADING,
         EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
         ResourceEventType.RELEASE, new ReleasePendingTransition())
+    .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED,
+        ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition())
 
     // From LOCALIZED (ref >= 0, on disk)
     .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
@@ -126,12 +130,14 @@ public class LocalizedResource implement
   }
 
   private void release(ContainerId container) {
-    if (!ref.remove(container)) {
-      LOG.info("Attempt to release claim on " + this +
-               " from unregistered container " + container);
-      assert false; // TODO: FIX
+    if (ref.remove(container)) {
+      // updating the timestamp only in case of success.
+      timestamp.set(currentTime());
+    } else {
+      LOG.info("Container " + container
+          + " doesn't exist in the container list of the Resource " + this
+          + " to which it sent RELEASE event");
     }
-    timestamp.set(currentTime());
   }
 
   private long currentTime() {
@@ -251,6 +257,25 @@ public class LocalizedResource implement
   }
 
   /**
+   * Resource localization failed, notify waiting containers.
+   */
+  @SuppressWarnings("unchecked")
+  private static class FetchFailedTransition extends ResourceTransition {
+    @Override
+    public void transition(LocalizedResource rsrc, ResourceEvent event) {
+      ResourceFailedLocalizationEvent failedEvent =
+          (ResourceFailedLocalizationEvent) event;
+      Queue<ContainerId> containers = rsrc.ref;
+      Throwable failureCause = failedEvent.getCause();
+      for (ContainerId container : containers) {
+        rsrc.dispatcher.getEventHandler().handle(
+          new ContainerResourceFailedEvent(container, failedEvent
+            .getLocalResourceRequest(), failureCause));
+      }
+    }
+  }
+
+  /**
    * Resource already localized, notify immediately.
    */
   @SuppressWarnings("unchecked") // dispatcher not typed

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Fri Apr 12 23:05:28 2013
@@ -34,7 +34,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -64,6 +63,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -99,11 +100,13 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -130,7 +133,7 @@ public class ResourceLocalizationService
   private RecordFactory recordFactory;
   private final ScheduledExecutorService cacheCleanup;
 
-  private final LocalResourcesTracker publicRsrc;
+  private LocalResourcesTracker publicRsrc;
 
   private LocalDirsHandlerService dirsHandler;
 
@@ -158,7 +161,6 @@ public class ResourceLocalizationService
     this.delService = delService;
     this.dirsHandler = dirsHandler;
 
-    this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
     this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
         new ThreadFactoryBuilder()
           .setNameFormat("ResourceLocalizationService Cache Cleanup")
@@ -173,8 +175,26 @@ public class ResourceLocalizationService
     }
   }
 
+  private void validateConf(Configuration conf) {
+    int perDirFileLimit =
+        conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
+          YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY);
+    if (perDirFileLimit <= 36) {
+      LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+          + " parameter is configured with very low value.");
+      throw new YarnException(
+        YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+            + " parameter is configured with a value less than 37.");
+    } else {
+      LOG.info("per directory file limit = " + perDirFileLimit);
+    }
+  }
+
   @Override
   public void init(Configuration conf) {
+    this.validateConf(conf);
+    this.publicRsrc =
+        new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
 
     try {
@@ -212,6 +232,7 @@ public class ResourceLocalizationService
         YarnConfiguration.NM_LOCALIZER_ADDRESS,
         YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
         YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
+
     localizerTracker = createLocalizerTracker(conf);
     addService(localizerTracker);
     dispatcher.register(LocalizerEventType.class, localizerTracker);
@@ -306,15 +327,17 @@ public class ResourceLocalizationService
   private void handleInitApplicationResources(Application app) {
     // 0) Create application tracking structs
     String userName = app.getUser();
-    privateRsrc.putIfAbsent(userName,
-        new LocalResourcesTrackerImpl(userName, dispatcher));
-    if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
-        new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
+    privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
+      dispatcher, true, super.getConfig()));
+    if (null != appRsrc.putIfAbsent(
+      ConverterUtils.toString(app.getAppId()),
+      new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
+        .getConfig()))) {
       LOG.warn("Initializing application " + app + " already present");
       assert false; // TODO: FIXME assert doesn't help
                     // ^ The condition is benign. Tests should fail and it
-                    //   should appear in logs, but it's an internal error
-                    //   that should have no effect on applications
+                    // should appear in logs, but it's an internal error
+                    // that should have no effect on applications
     }
     // 1) Signal container init
     //
@@ -455,6 +478,21 @@ public class ResourceLocalizationService
     }
   }
 
+  private String getUserFileCachePath(String user) {
+    String path =
+        "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+            + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
+    return path;
+  }
+
+  private String getUserAppCachePath(String user, String appId) {
+    String path =
+        "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+            + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
+            + Path.SEPARATOR + appId;
+    return path;
+  }
+
   /**
    * Sub-component handling the spawning of {@link ContainerLocalizer}s
    */
@@ -620,8 +658,18 @@ public class ResourceLocalizationService
             Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
                 "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
                 ContainerLocalizer.getEstimatedSize(resource), true);
+            Path hierarchicalPath =
+              publicRsrc.getPathForLocalization(key, publicDirDestPath);
+            if (!hierarchicalPath.equals(publicDirDestPath)) {
+              publicDirDestPath = hierarchicalPath;
+              DiskChecker.checkDir(
+                new File(publicDirDestPath.toUri().getPath()));
+            }
+            publicDirDestPath =
+                new Path(publicDirDestPath, Long.toString(publicRsrc
+                  .nextUniqueNumber()));
             pending.put(queue.submit(new FSDownload(
-                lfs, null, conf, publicDirDestPath, resource, new Random())),
+                lfs, null, conf, publicDirDestPath, resource)),
                 request);
             attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
           } catch (IOException e) {
@@ -635,7 +683,6 @@ public class ResourceLocalizationService
     }
 
     @Override
-    @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
       try {
         // TODO shutdown, better error handling esp. DU
@@ -651,22 +698,19 @@ public class ResourceLocalizationService
                 return;
               }
               LocalResourceRequest key = assoc.getResource().getRequest();
-              assoc.getResource().handle(
-                  new ResourceLocalizedEvent(key,
-                    local, FileUtil.getDU(new File(local.toUri()))));
+              publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
+                .getDU(new File(local.toUri()))));
               synchronized (attempts) {
                 attempts.remove(key);
               }
             } catch (ExecutionException e) {
               LOG.info("Failed to download rsrc " + assoc.getResource(),
                   e.getCause());
-              dispatcher.getEventHandler().handle(
-                  new ContainerResourceFailedEvent(
-                    assoc.getContext().getContainerId(),
-                    assoc.getResource().getRequest(), e.getCause()));
-              List<LocalizerResourceRequestEvent> reqs;
+              LocalResourceRequest req = assoc.getResource().getRequest();
+              publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e
+                .getCause()));
               synchronized (attempts) {
-                LocalResourceRequest req = assoc.getResource().getRequest();
+                List<LocalizerResourceRequestEvent> reqs;
                 reqs = attempts.get(req);
                 if (null == reqs) {
                   LOG.error("Missing pending list for " + req);
@@ -674,13 +718,6 @@ public class ResourceLocalizationService
                 }
                 attempts.remove(req);
               }
-              // let the other containers know about the localization failure
-              for (LocalizerResourceRequestEvent reqEvent : reqs) {
-                dispatcher.getEventHandler().handle(
-                    new ContainerResourceFailedEvent(
-                        reqEvent.getContext().getContainerId(),
-                        reqEvent.getResource().getRequest(), e.getCause()));
-              }
             } catch (CancellationException e) {
               // ignore; shutting down
             }
@@ -760,20 +797,34 @@ public class ResourceLocalizationService
       return null;
     }
 
-    // TODO this sucks. Fix it later
-    @SuppressWarnings("unchecked") // dispatcher not typed
     LocalizerHeartbeatResponse update(
         List<LocalResourceStatus> remoteResourceStatuses) {
       LocalizerHeartbeatResponse response =
         recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
 
+      String user = context.getUser();
+      ApplicationId applicationId =
+          context.getContainerId().getApplicationAttemptId().getApplicationId();
       // The localizer has just spawned. Start giving it resources for
       // remote-fetching.
       if (remoteResourceStatuses.isEmpty()) {
         LocalResource next = findNextResource();
         if (next != null) {
           response.setLocalizerAction(LocalizerAction.LIVE);
-          response.addResource(next);
+          try {
+            ArrayList<ResourceLocalizationSpec> rsrcs =
+                new ArrayList<ResourceLocalizationSpec>();
+            ResourceLocalizationSpec rsrc =
+                NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                  getPathForLocalization(next));
+            rsrcs.add(rsrc);
+            response.setResourceSpecs(rsrcs);
+          } catch (IOException e) {
+            LOG.error("local path for PRIVATE localization could not be found."
+                + "Disks might have failed.", e);
+          } catch (URISyntaxException e) {
+            // TODO fail? Already translated several times...
+          }
         } else if (pending.isEmpty()) {
           // TODO: Synchronization
           response.setLocalizerAction(LocalizerAction.DIE);
@@ -782,6 +833,12 @@ public class ResourceLocalizationService
         }
         return response;
       }
+      ArrayList<ResourceLocalizationSpec> rsrcs =
+          new ArrayList<ResourceLocalizationSpec>();
+       /*
+        * TODO : It doesn't support multiple downloads per ContainerLocalizer
+        * at the same time. We need to think whether we should support this.
+        */
 
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
@@ -801,10 +858,10 @@ public class ResourceLocalizationService
           case FETCH_SUCCESS:
             // notify resource
             try {
-              assoc.getResource().handle(
-                  new ResourceLocalizedEvent(req,
-                    ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
-                    stat.getLocalSize()));
+            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+              .handle(
+                new ResourceLocalizedEvent(req, ConverterUtils
+                  .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
             } catch (URISyntaxException e) { }
             if (pending.isEmpty()) {
               // TODO: Synchronization
@@ -814,7 +871,17 @@ public class ResourceLocalizationService
             response.setLocalizerAction(LocalizerAction.LIVE);
             LocalResource next = findNextResource();
             if (next != null) {
-              response.addResource(next);
+              try {
+                ResourceLocalizationSpec resource =
+                    NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                      getPathForLocalization(next));
+                rsrcs.add(resource);
+              } catch (IOException e) {
+                LOG.error("local path for PRIVATE localization could not be " +
+                  "found. Disks might have failed.", e);
+              } catch (URISyntaxException e) {
+                  //TODO fail? Already translated several times...
+              }
             }
             break;
           case FETCH_PENDING:
@@ -824,24 +891,45 @@ public class ResourceLocalizationService
             LOG.info("DEBUG: FAILED " + req, stat.getException());
             assoc.getResource().unlock();
             response.setLocalizerAction(LocalizerAction.DIE);
-            // TODO: Why is this event going directly to the container. Why not
-            // the resource itself? What happens to the resource? Is it removed?
-            dispatcher.getEventHandler().handle(
-                new ContainerResourceFailedEvent(context.getContainerId(),
-                  req, stat.getException()));
+            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+              .handle(
+                new ResourceFailedLocalizationEvent(req, stat.getException()));
             break;
           default:
             LOG.info("Unknown status: " + stat.getStatus());
             response.setLocalizerAction(LocalizerAction.DIE);
-            dispatcher.getEventHandler().handle(
-                new ContainerResourceFailedEvent(context.getContainerId(),
-                  req, stat.getException()));
+            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+              .handle(
+                new ResourceFailedLocalizationEvent(req, stat.getException()));
             break;
         }
       }
+      response.setResourceSpecs(rsrcs);
       return response;
     }
 
+    private Path getPathForLocalization(LocalResource rsrc) throws IOException,
+        URISyntaxException {
+      String user = context.getUser();
+      ApplicationId appId =
+          context.getContainerId().getApplicationAttemptId().getApplicationId();
+      LocalResourceVisibility vis = rsrc.getVisibility();
+      LocalResourcesTracker tracker =
+          getLocalResourcesTracker(vis, user, appId);
+      String cacheDirectory = null;
+      if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
+        cacheDirectory = getUserFileCachePath(user);
+      } else {// APPLICATION ONLY
+        cacheDirectory = getUserAppCachePath(user, appId.toString());
+      }
+      Path dirPath =
+          dirsHandler.getLocalPathForWrite(cacheDirectory,
+            ContainerLocalizer.getEstimatedSize(rsrc), false);
+      dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+        dirPath);
+      return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
+    }
+
     @Override
     @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java Fri Apr 12 23:05:28 2013
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no
 enum ResourceState {
   INIT,
   DOWNLOADING,
-  LOCALIZED
+  LOCALIZED,
+  FAILED
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Fri Apr 12 23:05:28 2013
@@ -29,5 +29,7 @@ public enum ResourceEventType {
   /** See {@link ResourceLocalizedEvent} */ 
   LOCALIZED,
   /** See {@link ResourceReleaseEvent} */
-  RELEASE
+  RELEASE,
+  /** See {@link ResourceFailedLocalizationEvent} */
+  LOCALIZATION_FAILED
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java Fri Apr 12 23:05:28 2013
@@ -72,7 +72,7 @@ public class ContainerInfo {
     }
 
     this.user = container.getUser();
-    Resource res = container.getLaunchContext().getResource();
+    Resource res = container.getResource();
     if (res != null) {
       this.totalMemoryNeededMB = res.getMemory();
     }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto Fri Apr 12 23:05:28 2013
@@ -47,7 +47,12 @@ enum LocalizerActionProto {
   DIE = 2;
 }
 
+message ResourceLocalizationSpecProto {
+  optional LocalResourceProto resource = 1;
+  optional URLProto destination_directory = 2;
+}
+
 message LocalizerHeartbeatResponseProto {
   optional LocalizerActionProto action = 1;
-  repeated LocalResourceProto resources = 2;
+  repeated ResourceLocalizationSpecProto resources = 2;
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java Fri Apr 12 23:05:28 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 
 /**
  * This class allows a node manager to run without without communicating with a
@@ -73,9 +74,9 @@ public class MockNodeStatusUpdater exten
       LOG.info("Got heartbeat number " + heartBeatID);
       nodeStatus.setResponseId(heartBeatID++);
 
-      NodeHeartbeatResponse nhResponse = recordFactory
-          .newRecordInstance(NodeHeartbeatResponse.class);
-      nhResponse.setResponseId(heartBeatID);
+      NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils
+          .newNodeHeartbeatResponse(heartBeatID, null, null,
+              null, null, 1000L);
       return nhResponse;
     }
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Fri Apr 12 23:05:28 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -43,6 +44,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.junit.Test;
+import static org.mockito.Mockito.*;
+
 
 public class TestEventFlow {
 
@@ -117,12 +120,15 @@ public class TestEventFlow {
     applicationAttemptId.setApplicationId(applicationId);
     applicationAttemptId.setAttemptId(0);
     cID.setApplicationAttemptId(applicationAttemptId);
-    launchContext.setContainerId(cID);
+    Container mockContainer = mock(Container.class);
+    when(mockContainer.getId()).thenReturn(cID);
+    when(mockContainer.getResource()).thenReturn(recordFactory
+        .newRecordInstance(Resource.class));
     launchContext.setUser("testing");
-    launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
     StartContainerRequest request = 
         recordFactory.newRecordInstance(StartContainerRequest.class);
     request.setContainerLaunchContext(launchContext);
+    request.setContainer(mockContainer);
     containerManager.startContainer(request);
 
     BaseContainerManagerTest.waitForContainerState(containerManager, cID,

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java Fri Apr 12 23:05:28 2013
@@ -99,7 +99,9 @@ public class TestNodeManagerReboot {
         Records.newRecord(ContainerLaunchContext.class);
     // Construct the Container-id
     ContainerId cId = createContainerId();
-    containerLaunchContext.setContainerId(cId);
+    org.apache.hadoop.yarn.api.records.Container mockContainer =
+        mock(org.apache.hadoop.yarn.api.records.Container.class);
+    when(mockContainer.getId()).thenReturn(cId);
 
     containerLaunchContext.setUser(user);
 
@@ -122,12 +124,13 @@ public class TestNodeManagerReboot {
     containerLaunchContext.setUser(containerLaunchContext.getUser());
     List<String> commands = new ArrayList<String>();
     containerLaunchContext.setCommands(commands);
-    containerLaunchContext.setResource(Records
-        .newRecord(Resource.class));
-    containerLaunchContext.getResource().setMemory(1024);
+    Resource resource = Records.newRecord(Resource.class);
+    resource.setMemory(1024);
+    when(mockContainer.getResource()).thenReturn(resource);
     StartContainerRequest startRequest =
         Records.newRecord(StartContainerRequest.class);
     startRequest.setContainerLaunchContext(containerLaunchContext);
+    startRequest.setContainer(mockContainer);
     containerManager.startContainer(startRequest);
 
     GetContainerStatusRequest request =
@@ -160,7 +163,10 @@ public class TestNodeManagerReboot {
         "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
         ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
 
-    nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT));
+    // restart the NodeManager
+    nm.stop();
+    nm = new MyNodeManager();
+    nm.start();    
 
     numTries = 0;
     while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
@@ -250,26 +256,6 @@ public class TestNodeManagerReboot {
       return delService;
     }
 
-    // mimic part of reboot process
-    @Override
-    public void handle(NodeManagerEvent event) {
-      switch (event.getType()) {
-        case SHUTDOWN:
-          this.stop();
-          break;
-        case REBOOT:
-          this.stop();
-          this.createNewMyNodeManager().start();
-          break;
-        default:
-          LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
-      }
-    }
-
-    private MyNodeManager createNewMyNodeManager() {
-      return new MyNodeManager();
-    }
-
     private YarnConfiguration createNMConfig() {
       YarnConfiguration conf = new YarnConfiguration();
       conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java Fri Apr 12 23:05:28 2013
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -28,6 +31,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
 
 import junit.framework.Assert;
 
@@ -38,6 +44,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -49,9 +56,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -71,6 +81,7 @@ public class TestNodeManagerShutdown {
       .getRecordFactory(null);
   static final String user = "nobody";
   private FileContext localFS;
+  private CyclicBarrier syncBarrier = new CyclicBarrier(2);
 
   @Before
   public void setup() throws UnsupportedFileSystemException {
@@ -91,16 +102,69 @@ public class TestNodeManagerShutdown {
     NodeManager nm = getNodeManager();
     nm.init(createNMConfig());
     nm.start();
+    startContainers(nm);
+    
+    final int MAX_TRIES=20;
+    int numTries = 0;
+    while (!processStartFile.exists() && numTries < MAX_TRIES) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ex) {ex.printStackTrace();}
+      numTries++;
+    }
+    
+    nm.stop();
     
+    // Now verify the contents of the file
+    // Script generates a message when it receives a sigterm
+    // so we look for that
+    BufferedReader reader =
+        new BufferedReader(new FileReader(processStartFile));
+
+    boolean foundSigTermMessage = false;
+    while (true) {
+      String line = reader.readLine();
+      if (line == null) {
+        break;
+      }
+      if (line.contains("SIGTERM")) {
+        foundSigTermMessage = true;
+        break;
+      }
+    }
+    Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
+    reader.close();
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testKillContainersOnResync() throws IOException, InterruptedException {
+    NodeManager nm = new TestNodeManager();
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+    startContainers(nm);
+
+    assert ((TestNodeManager) nm).getNMRegistrationCount() == 1;
+    nm.getNMDispatcher().getEventHandler().
+        handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
+    try {
+      syncBarrier.await();
+    } catch (BrokenBarrierException e) {
+    }
+    assert ((TestNodeManager) nm).getNMRegistrationCount() == 2;
+  }
+
+  private void startContainers(NodeManager nm) throws IOException {
     ContainerManagerImpl containerManager = nm.getContainerManager();
     File scriptFile = createUnhaltingScriptFile();
     
-    ContainerLaunchContext containerLaunchContext = 
+    ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
+    Container mockContainer = mock(Container.class);
     // Construct the Container-id
     ContainerId cId = createContainerId();
-    containerLaunchContext.setContainerId(cId);
+    when(mockContainer.getId()).thenReturn(cId);
 
     containerLaunchContext.setUser(user);
 
@@ -124,11 +188,12 @@ public class TestNodeManagerShutdown {
     commands.add("/bin/bash");
     commands.add(scriptFile.getAbsolutePath());
     containerLaunchContext.setCommands(commands);
-    containerLaunchContext.setResource(recordFactory
-        .newRecordInstance(Resource.class));
-    containerLaunchContext.getResource().setMemory(1024);
-    StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+    Resource resource = BuilderUtils.newResource(1024, 1);
+    when(mockContainer.getResource()).thenReturn(resource);
+    StartContainerRequest startRequest =
+        recordFactory.newRecordInstance(StartContainerRequest.class);
     startRequest.setContainerLaunchContext(containerLaunchContext);
+    startRequest.setContainer(mockContainer);
     containerManager.startContainer(startRequest);
     
     GetContainerStatusRequest request =
@@ -137,37 +202,6 @@ public class TestNodeManagerShutdown {
     ContainerStatus containerStatus =
         containerManager.getContainerStatus(request).getStatus();
     Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
-    
-    final int MAX_TRIES=20;
-    int numTries = 0;
-    while (!processStartFile.exists() && numTries < MAX_TRIES) {
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException ex) {ex.printStackTrace();}
-      numTries++;
-    }
-    
-    nm.stop();
-    
-    // Now verify the contents of the file
-    // Script generates a message when it receives a sigterm
-    // so we look for that
-    BufferedReader reader =
-        new BufferedReader(new FileReader(processStartFile));
-
-    boolean foundSigTermMessage = false;
-    while (true) {
-      String line = reader.readLine();
-      if (line == null) {
-        break;
-      }
-      if (line.contains("SIGTERM")) {
-        foundSigTermMessage = true;
-        break;
-      }
-    }
-    Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
-    reader.close();
   }
   
   private ContainerId createContainerId() {
@@ -226,4 +260,48 @@ public class TestNodeManagerShutdown {
       }
     };
   }
+
+  class TestNodeManager extends NodeManager {
+
+    private int registrationCount = 0;
+
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      return new TestNodeStatusUpdaterImpl(context, dispatcher,
+          healthChecker, metrics);
+    }
+
+    public int getNMRegistrationCount() {
+      return registrationCount;
+    }
+
+    class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater {
+
+      public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
+          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+        super(context, dispatcher, healthChecker, metrics);
+      }
+
+      @Override
+      protected void registerWithRM() throws YarnRemoteException {
+        super.registerWithRM();
+        registrationCount++;
+      }
+
+      @Override
+      protected void rebootNodeStatusUpdater() {
+        ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container> containers =
+            getNMContext().getContainers();
+        // ensure that containers are empty before restart nodeStatusUpdater
+        Assert.assertTrue(containers.isEmpty());
+        super.rebootNodeStatusUpdater();
+        try {
+          syncBarrier.await();
+        } catch (InterruptedException e) {
+        } catch (BrokenBarrierException e) {
+        }
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Fri Apr 12 23:05:28 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -29,6 +30,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -43,14 +46,17 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -58,12 +64,15 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.service.Service.STATE;
 import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -90,7 +99,8 @@ public class TestNodeStatusUpdater {
   private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
   private final Configuration conf = createNMConfig();
   private NodeManager nm;
-  protected NodeManager rebootedNodeManager;
+  private boolean containerStatusBackupSuccessfully = true;
+  private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
 
   @After
   public void tearDown() {
@@ -159,9 +169,15 @@ public class TestNodeStatusUpdater {
         throws YarnRemoteException {
       NodeStatus nodeStatus = request.getNodeStatus();
       LOG.info("Got heartbeat number " + heartBeatID);
+      NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
+      Dispatcher mockDispatcher = mock(Dispatcher.class);
+      EventHandler mockEventHandler = mock(EventHandler.class);
+      when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
       nodeStatus.setResponseId(heartBeatID++);
       Map<ApplicationId, List<ContainerStatus>> appToContainers =
           getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
+      org.apache.hadoop.yarn.api.records.Container mockContainer =
+          mock(org.apache.hadoop.yarn.api.records.Container.class);
       if (heartBeatID == 1) {
         Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
 
@@ -172,10 +188,12 @@ public class TestNodeStatusUpdater {
         firstContainerID.setId(heartBeatID);
         ContainerLaunchContext launchContext = recordFactory
             .newRecordInstance(ContainerLaunchContext.class);
-        launchContext.setContainerId(firstContainerID);
-        launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
-        launchContext.getResource().setMemory(2);
-        Container container = new ContainerImpl(conf , null, launchContext, null, null);
+        when(mockContainer.getId()).thenReturn(firstContainerID);
+        Resource resource = BuilderUtils.newResource(2, 1);
+        when(mockContainer.getResource()).thenReturn(resource);
+        Container container =
+            new ContainerImpl(conf, mockDispatcher, launchContext,
+                mockContainer, null, mockMetrics);
         this.context.getContainers().put(firstContainerID, container);
       } else if (heartBeatID == 2) {
         // Checks on the RM end
@@ -196,10 +214,12 @@ public class TestNodeStatusUpdater {
         secondContainerID.setId(heartBeatID);
         ContainerLaunchContext launchContext = recordFactory
             .newRecordInstance(ContainerLaunchContext.class);
-        launchContext.setContainerId(secondContainerID);
-        launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
-        launchContext.getResource().setMemory(3);
-        Container container = new ContainerImpl(conf, null, launchContext, null, null);
+        when(mockContainer.getId()).thenReturn(secondContainerID);
+        Resource resource = BuilderUtils.newResource(3, 1);
+        when(mockContainer.getResource()).thenReturn(resource);
+        Container container =
+            new ContainerImpl(conf, mockDispatcher, launchContext,
+                mockContainer, null, mockMetrics);
         this.context.getContainers().put(secondContainerID, container);
       } else if (heartBeatID == 3) {
         // Checks on the RM end
@@ -214,27 +234,43 @@ public class TestNodeStatusUpdater {
         Assert.assertEquals(2, activeContainers.size());
       }
 
-      NodeHeartbeatResponse nhResponse = recordFactory
-          .newRecordInstance(NodeHeartbeatResponse.class);
-      nhResponse.setResponseId(heartBeatID);
+      NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
+          newNodeHeartbeatResponse(heartBeatID, null, null, null, null, 1000L);
       return nhResponse;
     }
   }
 
   private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
-    public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
+    public ResourceTracker resourceTracker;
     private Context context;
 
     public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
       super(context, dispatcher, healthChecker, metrics);
       this.context = context;
+      resourceTracker = new MyResourceTracker(this.context);
+    }
+
+    @Override
+    protected ResourceTracker getRMClient() {
+      return resourceTracker;
+    }
+  }
+
+  private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
+    public ResourceTracker resourceTracker;
+
+    public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher,
+        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+      super(context, dispatcher, healthChecker, metrics);
+      resourceTracker = new MyResourceTracker4(context);
     }
 
     @Override
     protected ResourceTracker getRMClient() {
       return resourceTracker;
     }
+
   }
 
   private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
@@ -289,6 +325,21 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
+    private ResourceTracker resourceTracker;
+
+    public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
+        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+      super(context, dispatcher, healthChecker, metrics);
+      resourceTracker = new MyResourceTracker5();
+    }
+
+    @Override
+    protected ResourceTracker getRMClient() {
+      return resourceTracker;
+    }
+  }
+
   private class MyNodeManager extends NodeManager {
     
     private MyNodeStatusUpdater3 nodeStatusUpdater;
@@ -305,6 +356,32 @@ public class TestNodeStatusUpdater {
     }
   }
   
+  private class MyNodeManager2 extends NodeManager {
+    public boolean isStopped = false;
+    private NodeStatusUpdater nodeStatusUpdater;
+    private CyclicBarrier syncBarrier;
+    public MyNodeManager2 (CyclicBarrier syncBarrier) {
+      this.syncBarrier = syncBarrier;
+    }
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      nodeStatusUpdater =
+          new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
+                                     metrics);
+      return nodeStatusUpdater;
+    }
+
+    @Override
+    public void stop() {
+      super.stop();
+      isStopped = true;
+      try {
+        syncBarrier.await();
+      } catch (Exception e) {
+      }
+    }
+  }
   // 
   private class MyResourceTracker2 implements ResourceTracker {
     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
@@ -325,10 +402,9 @@ public class TestNodeStatusUpdater {
       NodeStatus nodeStatus = request.getNodeStatus();
       nodeStatus.setResponseId(heartBeatID++);
       
-      NodeHeartbeatResponse nhResponse = recordFactory
-      .newRecordInstance(NodeHeartbeatResponse.class);
-      nhResponse.setResponseId(heartBeatID);
-      nhResponse.setNodeAction(heartBeatNodeAction);
+      NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
+          newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
+              null, null, 1000L);
       return nhResponse;
     }
   }
@@ -361,10 +437,9 @@ public class TestNodeStatusUpdater {
       LOG.info("Got heartBeatId: [" + heartBeatID +"]");
       NodeStatus nodeStatus = request.getNodeStatus();
       nodeStatus.setResponseId(heartBeatID++);
-      NodeHeartbeatResponse nhResponse =
-              recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
-      nhResponse.setResponseId(heartBeatID);
-      nhResponse.setNodeAction(heartBeatNodeAction);
+      NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
+          newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
+              null, null, 1000L);
 
       if (nodeStatus.getKeepAliveApplications() != null
           && nodeStatus.getKeepAliveApplications().size() > 0) {
@@ -386,6 +461,124 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  private class MyResourceTracker4 implements ResourceTracker {
+
+    public NodeAction registerNodeAction = NodeAction.NORMAL;
+    public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+    private Context context;
+
+    public MyResourceTracker4(Context context) {
+      this.context = context;
+    }
+
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnRemoteException {
+      RegisterNodeManagerResponse response = recordFactory
+          .newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setNodeAction(registerNodeAction);
+      return response;
+    }
+
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnRemoteException {
+      try {
+        if (heartBeatID == 0) {
+          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+              .size(), 0);
+          Assert.assertEquals(context.getContainers().size(), 0);
+        } else if (heartBeatID == 1) {
+          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+              .size(), 5);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(0).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(0)
+                  .getContainerId().getId() == 1);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(1).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(1)
+                  .getContainerId().getId() == 2);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(2).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(2)
+                  .getContainerId().getId() == 3);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(3).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(3)
+                  .getContainerId().getId() == 4);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(4).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(4)
+                  .getContainerId().getId() == 5);
+          throw new YarnException("Lost the heartbeat response");
+        } else if (heartBeatID == 2) {
+          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+              .size(), 7);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(0).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(0)
+                  .getContainerId().getId() == 3);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(1).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(1)
+                  .getContainerId().getId() == 4);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(2).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(2)
+                  .getContainerId().getId() == 1);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(3).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(3)
+                  .getContainerId().getId() == 2);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(4).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(4)
+                  .getContainerId().getId() == 5);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(5).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(5)
+                  .getContainerId().getId() == 6);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(6).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(6)
+                  .getContainerId().getId() == 7);
+        }
+      } catch (AssertionError error) {
+        LOG.info(error);
+        containerStatusBackupSuccessfully = false;
+      } finally {
+        heartBeatID++;
+      }
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartBeatID);
+      NodeHeartbeatResponse nhResponse =
+          YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
+              heartBeatNodeAction, null, null, null, 1000L);
+      return nhResponse;
+    }
+  }
+
+  private class MyResourceTracker5 implements ResourceTracker {
+    public NodeAction registerNodeAction = NodeAction.NORMAL;
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnRemoteException {
+      
+      RegisterNodeManagerResponse response = recordFactory
+          .newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setNodeAction(registerNodeAction );
+      return response;
+    }
+    
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnRemoteException {
+      heartBeatID++;
+      throw RPCUtil.getRemoteException("NodeHeartbeat exception");
+    }
+  }
+
   @Before
   public void clearError() {
     nmStartError = null;
@@ -473,8 +666,8 @@ public class TestNodeStatusUpdater {
       }
       
       @Override
-      protected void cleanupContainers() {
-        super.cleanupContainers();
+      protected void cleanupContainers(NodeManagerEventType eventType) {
+        super.cleanupContainers(NodeManagerEventType.SHUTDOWN);
         numCleanups.incrementAndGet();
       }
     };
@@ -528,50 +721,6 @@ public class TestNodeStatusUpdater {
   }
 
   @Test
-  public void testNodeReboot() throws Exception {
-    nm = getNodeManager(NodeAction.REBOOT);
-    YarnConfiguration conf = createNMConfig();
-    nm.init(conf);
-    Assert.assertEquals(STATE.INITED, nm.getServiceState());
-    nm.start();
-
-    int waitCount = 0;
-    while (heartBeatID < 1 && waitCount++ != 20) {
-      Thread.sleep(500);
-    }
-    Assert.assertFalse(heartBeatID < 1);
-
-    // NM takes a while to reach the STOPPED state.
-    waitCount = 0;
-    while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
-      LOG.info("Waiting for NM to stop..");
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
-    
-    waitCount = 0;
-    while (null == rebootedNodeManager && waitCount++ != 20) {
-      LOG.info("Waiting for NM to reinitialize..");
-      Thread.sleep(1000);
-    }
-      
-    waitCount = 0;
-    while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) {
-      LOG.info("Waiting for NM to start..");
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState());
-
-    rebootedNodeManager.stop();
-    waitCount = 0;
-    while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
-      LOG.info("Waiting for NM to stop..");
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState());
-  }
-  
-  @Test
   public void testNMShutdownForRegistrationFailure() {
 
     nm = new NodeManager() {
@@ -727,6 +876,151 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  /**
+   * Test completed containerStatus get back up when heart beat lost
+   */
+  @Test(timeout = 20000)
+  public void testCompletedContainerStatusBackup() throws Exception {
+    nm = new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+        MyNodeStatusUpdater2 myNodeStatusUpdater =
+            new MyNodeStatusUpdater2(context, dispatcher, healthChecker,
+                metrics);
+        return myNodeStatusUpdater;
+      }
+
+      @Override
+      protected NMContext createNMContext(
+          NMContainerTokenSecretManager containerTokenSecretManager) {
+        return new MyNMContext(containerTokenSecretManager);
+      }
+
+    };
+
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+
+    int waitCount = 0;
+    while (heartBeatID <= 3 && waitCount++ != 20) {
+      Thread.sleep(500);
+    }
+    if(!containerStatusBackupSuccessfully) {
+      Assert.fail("ContainerStatus Backup failed");
+    }
+    nm.stop();
+  }
+
+  @Test(timeout = 20000)
+  public void testNodeStatusUpdaterRetryAndNMShutdown() 
+      throws InterruptedException {
+    final long connectionWaitSecs = 1;
+    final long connectionRetryIntervalSecs = 1;
+    YarnConfiguration conf = createNMConfig();
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+        connectionWaitSecs);
+    conf.setLong(YarnConfiguration
+        .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+        connectionRetryIntervalSecs);
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    nm = new MyNodeManager2(syncBarrier);
+    nm.init(conf);
+    nm.start();
+    try {
+      syncBarrier.await();
+    } catch (Exception e) {
+    }
+    Assert.assertTrue(((MyNodeManager2) nm).isStopped);
+    Assert.assertTrue("calculate heartBeatCount based on" +
+        " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
+  }
+
+  private class MyNMContext extends NMContext {
+    ConcurrentMap<ContainerId, Container> containers =
+        new ConcurrentSkipListMap<ContainerId, Container>();
+
+    public MyNMContext(NMContainerTokenSecretManager
+        containerTokenSecretManager) {
+      super(containerTokenSecretManager);
+    }
+
+    @Override
+    public ConcurrentMap<ContainerId, Container> getContainers() {
+      if (heartBeatID == 0) {
+        return containers;
+      } else if (heartBeatID == 1) {
+        ContainerStatus containerStatus1 =
+            createContainerStatus(1, ContainerState.RUNNING);
+        Container container1 = getMockContainer(containerStatus1);
+        containers.put(containerStatus1.getContainerId(), container1);
+
+        ContainerStatus containerStatus2 =
+            createContainerStatus(2, ContainerState.RUNNING);
+        Container container2 = getMockContainer(containerStatus2);
+        containers.put(containerStatus2.getContainerId(), container2);
+
+        ContainerStatus containerStatus3 =
+            createContainerStatus(3, ContainerState.COMPLETE);
+        Container container3 = getMockContainer(containerStatus3);
+        containers.put(containerStatus3.getContainerId(), container3);
+        completedContainerStatusList.add(containerStatus3);
+
+        ContainerStatus containerStatus4 =
+            createContainerStatus(4, ContainerState.COMPLETE);
+        Container container4 = getMockContainer(containerStatus4);
+        containers.put(containerStatus4.getContainerId(), container4);
+        completedContainerStatusList.add(containerStatus4);
+
+        ContainerStatus containerStatus5 =
+            createContainerStatus(5, ContainerState.RUNNING);
+        Container container5 = getMockContainer(containerStatus5);
+        containers.put(containerStatus5.getContainerId(), container5);
+
+        return containers;
+      } else if (heartBeatID == 2) {
+        ContainerStatus containerStatus6 =
+            createContainerStatus(6, ContainerState.RUNNING);
+        Container container6 = getMockContainer(containerStatus6);
+        containers.put(containerStatus6.getContainerId(), container6);
+
+        ContainerStatus containerStatus7 =
+            createContainerStatus(7, ContainerState.COMPLETE);
+        Container container7 = getMockContainer(containerStatus7);
+        containers.put(containerStatus7.getContainerId(), container7);
+        completedContainerStatusList.add(containerStatus7);
+
+        return containers;
+      } else {
+        containers.clear();
+
+        return containers;
+      }
+    }
+
+    private ContainerStatus createContainerStatus(int id,
+        ContainerState containerState) {
+      ApplicationId applicationId =
+          BuilderUtils.newApplicationId(System.currentTimeMillis(), id);
+      ApplicationAttemptId applicationAttemptId =
+          BuilderUtils.newApplicationAttemptId(applicationId, id);
+      ContainerId contaierId =
+          BuilderUtils.newContainerId(applicationAttemptId, id);
+      ContainerStatus containerStatus =
+          BuilderUtils.newContainerStatus(contaierId, containerState,
+              "test_containerStatus: id=" + id + ", containerState: "
+                  + containerState, 0);
+      return containerStatus;
+    }
+
+    private Container getMockContainer(ContainerStatus containerStatus) {
+      Container container = mock(Container.class);
+      when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus);
+      return container;
+    }
+  }
+
   private void verifyNodeStartFailure(String errMessage) {
     YarnConfiguration conf = createNMConfig();
     nm.init(conf);
@@ -773,12 +1067,6 @@ public class TestNodeStatusUpdater {
         myNodeStatusUpdater.resourceTracker = myResourceTracker2;
         return myNodeStatusUpdater;
       }
-
-      @Override
-      NodeManager createNewNodeManager() {
-        rebootedNodeManager = getNodeManager(NodeAction.NORMAL);
-        return rebootedNodeManager;
-      }
     };
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java Fri Apr 12 23:05:28 2013
@@ -17,6 +17,13 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -31,15 +38,14 @@ import org.apache.hadoop.yarn.ipc.RPCUti
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 public class TestPBRecordImpl {
 
@@ -54,9 +60,8 @@ public class TestPBRecordImpl {
   static LocalResource createResource() {
     LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
     assertTrue(ret instanceof LocalResourcePBImpl);
-    ret.setResource(
-        ConverterUtils.getYarnUrlFromPath(
-          new Path("hdfs://y.ak:8020/foo/bar")));
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(
+      "hdfs://y.ak:8020/foo/bar")));
     ret.setSize(4344L);
     ret.setTimestamp(3141592653589793L);
     ret.setVisibility(LocalResourceVisibility.PUBLIC);
@@ -90,16 +95,27 @@ public class TestPBRecordImpl {
     return ret;
   }
 
-  static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() {
+  static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() 
+      throws URISyntaxException {
     LocalizerHeartbeatResponse ret =
       recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
     assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl);
     ret.setLocalizerAction(LocalizerAction.LIVE);
-    ret.addResource(createResource());
+    LocalResource rsrc = createResource();
+    ArrayList<ResourceLocalizationSpec> rsrcs =
+      new ArrayList<ResourceLocalizationSpec>();
+    ResourceLocalizationSpec resource =
+      recordFactory.newRecordInstance(ResourceLocalizationSpec.class);
+    resource.setResource(rsrc);
+    resource.setDestinationDirectory(ConverterUtils
+      .getYarnUrlFromPath(new Path("/tmp" + System.currentTimeMillis())));
+    rsrcs.add(resource);
+    ret.setResourceSpecs(rsrcs);
+    System.out.println(resource);
     return ret;
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalResourceStatusSerDe() throws Exception {
     LocalResourceStatus rsrcS = createLocalResourceStatus();
     assertTrue(rsrcS instanceof LocalResourceStatusPBImpl);
@@ -119,7 +135,7 @@ public class TestPBRecordImpl {
     assertEquals(createResource(), rsrcD.getResource());
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalizerStatusSerDe() throws Exception {
     LocalizerStatus rsrcS = createLocalizerStatus();
     assertTrue(rsrcS instanceof LocalizerStatusPBImpl);
@@ -141,7 +157,7 @@ public class TestPBRecordImpl {
     assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0));
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalizerHeartbeatResponseSerDe() throws Exception {
     LocalizerHeartbeatResponse rsrcS = createLocalizerHeartbeatResponse();
     assertTrue(rsrcS instanceof LocalizerHeartbeatResponsePBImpl);
@@ -158,8 +174,8 @@ public class TestPBRecordImpl {
       new LocalizerHeartbeatResponsePBImpl(rsrcPbD);
 
     assertEquals(rsrcS, rsrcD);
-    assertEquals(createResource(), rsrcS.getLocalResource(0));
-    assertEquals(createResource(), rsrcD.getLocalResource(0));
+    assertEquals(createResource(), rsrcS.getResourceSpecs().get(0).getResource());
+    assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource());
   }
 
 }