You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/11 04:08:12 UTC

svn commit: r1466756 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ hadoop-yar...

Author: vinodkv
Date: Thu Apr 11 02:08:11 2013
New Revision: 1466756

URL: http://svn.apache.org/r1466756
Log:
YARN-539. Addressed memory leak of LocalResource objects NM when a resource localization fails. Contributed by Omkar Vinit Joshi.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Apr 11 02:08:11 2013
@@ -220,6 +220,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-534. Change RM restart recovery to also account for AM max-attempts 
     configuration after the restart. (Jian He via vinodkv)
 
+    YARN-539. Addressed memory leak of LocalResource objects NM when a resource
+    localization fails. (Omkar Vinit Joshi via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml Thu Apr 11 02:08:11 2013
@@ -270,4 +270,11 @@
     <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
   </Match>
 
+  <!-- This type cast problem will never occur. -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl" />
+    <Method name="handle" />
+    <Bug pattern="BC_UNCONFIRMED_CAST" />
+  </Match>
+
 </FindBugsFilter>

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Thu Apr 11 02:08:11 2013
@@ -40,8 +40,5 @@ interface LocalResourcesTracker
 
   String getUser();
 
-  // TODO: Remove this in favour of EventHandler.handle
-  void localizationCompleted(LocalResourceRequest req, boolean success);
-
   long nextUniqueNumber();
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Thu Apr 11 02:08:11 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 
 
 /**
@@ -96,13 +97,22 @@ class LocalResourcesTrackerImpl implemen
     this.conf = conf;
   }
 
+  /*
+   * Synchronizing this method for avoiding races due to multiple ResourceEvent's
+   * coming to LocalResourcesTracker from Public/Private localizer and
+   * Resource Localization Service.
+   */
   @Override
-  public void handle(ResourceEvent event) {
+  public synchronized void handle(ResourceEvent event) {
     LocalResourceRequest req = event.getLocalResourceRequest();
     LocalizedResource rsrc = localrsrc.get(req);
     switch (event.getType()) {
-    case REQUEST:
     case LOCALIZED:
+      if (useLocalCacheDirectoryManager) {
+        inProgressLocalResourcesMap.remove(req);
+      }
+      break;
+    case REQUEST:
       if (rsrc != null && (!isResourcePresent(rsrc))) {
         LOG.info("Resource " + rsrc.getLocalPath()
             + " is missing, localizing it again");
@@ -117,10 +127,24 @@ class LocalResourcesTrackerImpl implemen
       break;
     case RELEASE:
       if (null == rsrc) {
-        LOG.info("Release unknown rsrc null (discard)");
+        // The container sent a release event on a resource which 
+        // 1) Failed
+        // 2) Removed for some reason (ex. disk is no longer accessible)
+        ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
+        LOG.info("Container " + relEvent.getContainer()
+            + " sent RELEASE event on a resource request " + req
+            + " not present in cache.");
         return;
       }
       break;
+    case LOCALIZATION_FAILED:
+      decrementFileCountForLocalCacheDirectory(req, null);
+      /*
+       * If resource localization fails then Localized resource will be
+       * removed from local cache.
+       */
+      localrsrc.remove(req);
+      break;
     }
     rsrc.handle(event);
   }
@@ -280,18 +304,6 @@ class LocalResourcesTrackerImpl implemen
   }
 
   @Override
-  public void localizationCompleted(LocalResourceRequest req,
-      boolean success) {
-    if (useLocalCacheDirectoryManager) {
-      if (!success) {
-        decrementFileCountForLocalCacheDirectory(req, null);
-      } else {
-        inProgressLocalResourcesMap.remove(req);
-      }
-    }
-  }
-
-  @Override
   public long nextUniqueNumber() {
     return uniqueNumberGenerator.incrementAndGet();
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Thu Apr 11 02:08:11 2013
@@ -32,10 +32,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
@@ -89,6 +91,8 @@ public class LocalizedResource implement
     .addTransition(ResourceState.DOWNLOADING,
         EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
         ResourceEventType.RELEASE, new ReleasePendingTransition())
+    .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED,
+        ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition())
 
     // From LOCALIZED (ref >= 0, on disk)
     .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
@@ -126,12 +130,14 @@ public class LocalizedResource implement
   }
 
   private void release(ContainerId container) {
-    if (!ref.remove(container)) {
-      LOG.info("Attempt to release claim on " + this +
-               " from unregistered container " + container);
-      assert false; // TODO: FIX
+    if (ref.remove(container)) {
+      // updating the timestamp only in case of success.
+      timestamp.set(currentTime());
+    } else {
+      LOG.info("Container " + container
+          + " doesn't exist in the container list of the Resource " + this
+          + " to which it sent RELEASE event");
     }
-    timestamp.set(currentTime());
   }
 
   private long currentTime() {
@@ -251,6 +257,25 @@ public class LocalizedResource implement
   }
 
   /**
+   * Resource localization failed, notify waiting containers.
+   */
+  @SuppressWarnings("unchecked")
+  private static class FetchFailedTransition extends ResourceTransition {
+    @Override
+    public void transition(LocalizedResource rsrc, ResourceEvent event) {
+      ResourceFailedLocalizationEvent failedEvent =
+          (ResourceFailedLocalizationEvent) event;
+      Queue<ContainerId> containers = rsrc.ref;
+      Throwable failureCause = failedEvent.getCause();
+      for (ContainerId container : containers) {
+        rsrc.dispatcher.getEventHandler().handle(
+          new ContainerResourceFailedEvent(container, failedEvent
+            .getLocalResourceRequest(), failureCause));
+      }
+    }
+  }
+
+  /**
    * Resource already localized, notify immediately.
    */
   @SuppressWarnings("unchecked") // dispatcher not typed

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu Apr 11 02:08:11 2013
@@ -84,7 +84,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
-import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@@ -101,6 +100,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
@@ -683,7 +683,6 @@ public class ResourceLocalizationService
     }
 
     @Override
-    @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
       try {
         // TODO shutdown, better error handling esp. DU
@@ -699,10 +698,8 @@ public class ResourceLocalizationService
                 return;
               }
               LocalResourceRequest key = assoc.getResource().getRequest();
-              assoc.getResource().handle(
-                  new ResourceLocalizedEvent(key,
-                    local, FileUtil.getDU(new File(local.toUri()))));
-              publicRsrc.localizationCompleted(key, true);
+              publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
+                .getDU(new File(local.toUri()))));
               synchronized (attempts) {
                 attempts.remove(key);
               }
@@ -710,13 +707,10 @@ public class ResourceLocalizationService
               LOG.info("Failed to download rsrc " + assoc.getResource(),
                   e.getCause());
               LocalResourceRequest req = assoc.getResource().getRequest();
-              dispatcher.getEventHandler().handle(
-                  new ContainerResourceFailedEvent(
-                    assoc.getContext().getContainerId(),
-                    req, e.getCause()));
-              publicRsrc.localizationCompleted(req, false);
-              List<LocalizerResourceRequestEvent> reqs;
+              publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e
+                .getCause()));
               synchronized (attempts) {
+                List<LocalizerResourceRequestEvent> reqs;
                 reqs = attempts.get(req);
                 if (null == reqs) {
                   LOG.error("Missing pending list for " + req);
@@ -724,13 +718,6 @@ public class ResourceLocalizationService
                 }
                 attempts.remove(req);
               }
-              // let the other containers know about the localization failure
-              for (LocalizerResourceRequestEvent reqEvent : reqs) {
-                dispatcher.getEventHandler().handle(
-                    new ContainerResourceFailedEvent(
-                        reqEvent.getContext().getContainerId(),
-                        reqEvent.getResource().getRequest(), e.getCause()));
-              }
             } catch (CancellationException e) {
               // ignore; shutting down
             }
@@ -810,13 +797,14 @@ public class ResourceLocalizationService
       return null;
     }
 
-    // TODO this sucks. Fix it later
-    @SuppressWarnings("unchecked") // dispatcher not typed
     LocalizerHeartbeatResponse update(
         List<LocalResourceStatus> remoteResourceStatuses) {
       LocalizerHeartbeatResponse response =
         recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
 
+      String user = context.getUser();
+      ApplicationId applicationId =
+          context.getContainerId().getApplicationAttemptId().getApplicationId();
       // The localizer has just spawned. Start giving it resources for
       // remote-fetching.
       if (remoteResourceStatuses.isEmpty()) {
@@ -847,6 +835,11 @@ public class ResourceLocalizationService
       }
       ArrayList<ResourceLocalizationSpec> rsrcs =
           new ArrayList<ResourceLocalizationSpec>();
+       /*
+        * TODO : It doesn't support multiple downloads per ContainerLocalizer
+        * at the same time. We need to think whether we should support this.
+        */
+
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
         LocalResourceRequest req = null;
@@ -865,11 +858,10 @@ public class ResourceLocalizationService
           case FETCH_SUCCESS:
             // notify resource
             try {
-              assoc.getResource().handle(
-                  new ResourceLocalizedEvent(req,
-                    ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
-                    stat.getLocalSize()));
-              localizationCompleted(stat);
+            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+              .handle(
+                new ResourceLocalizedEvent(req, ConverterUtils
+                  .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
             } catch (URISyntaxException e) { }
             if (pending.isEmpty()) {
               // TODO: Synchronization
@@ -899,19 +891,16 @@ public class ResourceLocalizationService
             LOG.info("DEBUG: FAILED " + req, stat.getException());
             assoc.getResource().unlock();
             response.setLocalizerAction(LocalizerAction.DIE);
-            localizationCompleted(stat);
-            // TODO: Why is this event going directly to the container. Why not
-            // the resource itself? What happens to the resource? Is it removed?
-            dispatcher.getEventHandler().handle(
-                new ContainerResourceFailedEvent(context.getContainerId(),
-                  req, stat.getException()));
+            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+              .handle(
+                new ResourceFailedLocalizationEvent(req, stat.getException()));
             break;
           default:
             LOG.info("Unknown status: " + stat.getStatus());
             response.setLocalizerAction(LocalizerAction.DIE);
-            dispatcher.getEventHandler().handle(
-                new ContainerResourceFailedEvent(context.getContainerId(),
-                  req, stat.getException()));
+            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+              .handle(
+                new ResourceFailedLocalizationEvent(req, stat.getException()));
             break;
         }
       }
@@ -919,27 +908,6 @@ public class ResourceLocalizationService
       return response;
     }
 
-    private void localizationCompleted(LocalResourceStatus stat) {
-      try {
-        LocalResource rsrc = stat.getResource();
-        LocalResourceRequest key = new LocalResourceRequest(rsrc);
-        String user = context.getUser();
-        ApplicationId appId =
-            context.getContainerId().getApplicationAttemptId()
-              .getApplicationId();
-        LocalResourceVisibility vis = rsrc.getVisibility();
-        LocalResourcesTracker tracker =
-            getLocalResourcesTracker(vis, user, appId);
-        if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) {
-          tracker.localizationCompleted(key, true);
-        } else {
-          tracker.localizationCompleted(key, false);
-        }
-      } catch (URISyntaxException e) {
-        LOG.error("Invalid resource URL specified", e);
-      }
-    }
-
     private Path getPathForLocalization(LocalResource rsrc) throws IOException,
         URISyntaxException {
       String user = context.getUser();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java Thu Apr 11 02:08:11 2013
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no
 enum ResourceState {
   INIT,
   DOWNLOADING,
-  LOCALIZED
+  LOCALIZED,
+  FAILED
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Thu Apr 11 02:08:11 2013
@@ -29,5 +29,7 @@ public enum ResourceEventType {
   /** See {@link ResourceLocalizedEvent} */ 
   LOCALIZED,
   /** See {@link ResourceReleaseEvent} */
-  RELEASE
+  RELEASE,
+  /** See {@link ResourceFailedLocalizationEvent} */
+  LOCALIZATION_FAILED
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java?rev=1466756&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceFailedLocalizationEvent.java Thu Apr 11 02:08:11 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+/**
+ * This event is sent by the localizer in case resource localization fails for
+ * the requested resource.
+ */
+public class ResourceFailedLocalizationEvent extends ResourceEvent {
+
+  private Throwable cause;
+
+  public ResourceFailedLocalizationEvent(LocalResourceRequest rsrc,
+      Throwable cause) {
+    super(rsrc, ResourceEventType.LOCALIZATION_FAILED);
+    this.cause = cause;
+  }
+
+  public Throwable getCause() {
+    return cause;
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1466756&r1=1466755&r2=1466756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Thu Apr 11 02:08:11 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
 import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -41,11 +42,15 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
@@ -224,6 +229,142 @@ public class TestLocalResourcesTrackerIm
     }
   }
 
+  @Test(timeout = 1000)
+  @SuppressWarnings("unchecked")
+  public void testLocalResourceCache() {
+    String user = "testuser";
+    DrainDispatcher dispatcher = null;
+    try {
+      Configuration conf = new Configuration();
+      dispatcher = createDispatcher(conf);
+
+      EventHandler<LocalizerEvent> localizerEventHandler =
+          mock(EventHandler.class);
+      EventHandler<ContainerEvent> containerEventHandler =
+          mock(EventHandler.class);
+
+      // Registering event handlers.
+      dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+      dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+      ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
+          new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+      LocalResourcesTracker tracker =
+          new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, true, conf);
+
+      LocalResourceRequest lr =
+          createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
+
+      // Creating 2 containers for same application which will be requesting
+      // same local resource.
+      // Container 1 requesting local resource.
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+      ResourceEvent reqEvent1 =
+          new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc1);
+
+      // No resource request is initially present in local cache
+      Assert.assertEquals(0, localrsrc.size());
+
+      // Container-1 requesting local resource.
+      tracker.handle(reqEvent1);
+
+      // New localized Resource should have been added to local resource map
+      // and the requesting container will be added to its waiting queue.
+      Assert.assertEquals(1, localrsrc.size());
+      Assert.assertTrue(localrsrc.containsKey(lr));
+      Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
+      Assert.assertTrue(localrsrc.get(lr).ref.contains(cId1));
+      Assert.assertEquals(ResourceState.DOWNLOADING, localrsrc.get(lr)
+        .getState());
+
+      // Container 2 requesting the resource
+      ContainerId cId2 = BuilderUtils.newContainerId(1, 1, 1, 2);
+      LocalizerContext lc2 = new LocalizerContext(user, cId2, null);
+      ResourceEvent reqEvent2 =
+          new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2);
+      tracker.handle(reqEvent2);
+
+      // Container 2 should have been added to the waiting queue of the local
+      // resource
+      Assert.assertEquals(2, localrsrc.get(lr).getRefCount());
+      Assert.assertTrue(localrsrc.get(lr).ref.contains(cId2));
+
+      // Failing resource localization
+      ResourceEvent resourceFailedEvent =
+          new ResourceFailedLocalizationEvent(lr, new Exception("test"));
+      
+      // Backing up the resource to track its state change as it will be
+      // removed after the failed event.
+      LocalizedResource localizedResource = localrsrc.get(lr);
+      
+      tracker.handle(resourceFailedEvent);
+
+      // After receiving failed resource event; all waiting containers will be
+      // notified with Container Resource Failed Event.
+      Assert.assertEquals(0, localrsrc.size());
+      verify(containerEventHandler, times(2)).handle(
+        isA(ContainerResourceFailedEvent.class));
+      Assert.assertEquals(ResourceState.FAILED, localizedResource.getState());
+
+      // Container 1 trying to release the resource (This resource is already
+      // deleted from the cache. This call should return silently without
+      // exception.
+      ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1);
+      tracker.handle(relEvent1);
+
+      // Container-3 now requests for the same resource. This request call
+      // is coming prior to Container-2's release call.
+      ContainerId cId3 = BuilderUtils.newContainerId(1, 1, 1, 3);
+      LocalizerContext lc3 = new LocalizerContext(user, cId3, null);
+      ResourceEvent reqEvent3 =
+          new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3);
+      tracker.handle(reqEvent3);
+
+      // Local resource cache now should have the requested resource and the
+      // number of waiting containers should be 1.
+      Assert.assertEquals(1, localrsrc.size());
+      Assert.assertTrue(localrsrc.containsKey(lr));
+      Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
+      Assert.assertTrue(localrsrc.get(lr).ref.contains(cId3));
+
+      // Container-2 Releases the resource
+      ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2);
+      tracker.handle(relEvent2);
+
+      // Making sure that there is no change in the cache after the release.
+      Assert.assertEquals(1, localrsrc.size());
+      Assert.assertTrue(localrsrc.containsKey(lr));
+      Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
+      Assert.assertTrue(localrsrc.get(lr).ref.contains(cId3));
+      
+      // Sending ResourceLocalizedEvent to tracker. In turn resource should
+      // send Container Resource Localized Event to waiting containers.
+      Path localizedPath = new Path("/tmp/file1");
+      ResourceLocalizedEvent localizedEvent =
+          new ResourceLocalizedEvent(lr, localizedPath, 123L);
+      tracker.handle(localizedEvent);
+      
+      // Verifying ContainerResourceLocalizedEvent .
+      verify(containerEventHandler, times(1)).handle(
+        isA(ContainerResourceLocalizedEvent.class));
+      Assert.assertEquals(ResourceState.LOCALIZED, localrsrc.get(lr)
+        .getState());
+      Assert.assertEquals(1, localrsrc.get(lr).getRefCount());
+      
+      // Container-3 releasing the resource.
+      ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3);
+      tracker.handle(relEvent3);
+      
+      Assert.assertEquals(0, localrsrc.get(lr).getRefCount());
+      
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
   @Test(timeout = 100000)
   @SuppressWarnings("unchecked")
   public void testHierarchicalLocalCacheDirectories() {
@@ -266,19 +407,25 @@ public class TestLocalResourcesTrackerIm
       // Simulate the process of localization of lr1
       Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
       // Simulate lr1 getting localized
-      ResourceLocalizedEvent rle =
+      ResourceLocalizedEvent rle1 =
           new ResourceLocalizedEvent(lr1,
               new Path(hierarchicalPath1.toUri().toString() +
                   Path.SEPARATOR + "file1"), 120);
-      tracker.handle(rle);
+      tracker.handle(rle1);
       // Localization successful.
-      tracker.localizationCompleted(lr1, true);
 
       LocalResourceRequest lr2 = createLocalResourceRequest(user, 3, 3,
           LocalResourceVisibility.PUBLIC);
+      // Container 1 requests lr2 to be localized.
+      ResourceEvent reqEvent2 =
+          new ResourceRequestEvent(lr2, LocalResourceVisibility.PUBLIC, lc1);
+      tracker.handle(reqEvent2);
+
       Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
       // localization failed.
-      tracker.localizationCompleted(lr2, false);
+      ResourceFailedLocalizationEvent rfe2 =
+          new ResourceFailedLocalizationEvent(lr2, new Exception("Test"));
+      tracker.handle(rfe2);
 
       /*
        * The path returned for two localization should be different because we
@@ -292,7 +439,11 @@ public class TestLocalResourcesTrackerIm
           LocalResourceVisibility.PUBLIC, lc1);
       tracker.handle(reqEvent3);
       Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
-      tracker.localizationCompleted(lr3, true);
+      // localization successful
+      ResourceLocalizedEvent rle3 =
+          new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
+            .toString() + Path.SEPARATOR + "file3"), 120);
+      tracker.handle(rle3);
 
       // Verifying that path created is inside the subdirectory
       Assert.assertEquals(hierarchicalPath3.toUri().toString(),