You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/09 03:33:39 UTC

svn commit: r1465853 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemana...

Author: vinodkv
Date: Tue Apr  9 01:33:38 2013
New Revision: 1465853

URL: http://svn.apache.org/r1465853
Log:
YARN-99. Modify private distributed cache to localize files such that no local directory hits unix file count limits and thus prevent job failures. Contributed by Omkar Vinit Joshi.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Apr  9 01:33:38 2013
@@ -201,6 +201,10 @@ Release 2.0.5-beta - UNRELEASED
     to implement closeable so that they can be stopped when needed via
     RPC.stopProxy(). (Siddharth Seth via vinodkv)
 
+    YARN-99. Modify private distributed cache to localize files such that no
+    local directory hits unix file count limits and thus prevent job failures.
+    (Omkar Vinit Joshi via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java?rev=1465853&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java Tue Apr  9 01:33:38 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.URL;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@VisibleForTesting
+public interface ResourceLocalizationSpec {
+
+  public void setResource(LocalResource rsrc);
+
+  public LocalResource getResource();
+
+  public void setDestinationDirectory(URL destinationDirectory);
+
+  public URL getDestinationDirectory();
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java?rev=1465853&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java Tue Apr  9 01:33:38 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProtoOrBuilder;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+
+public class ResourceLocalizationSpecPBImpl extends
+    ProtoBase<ResourceLocalizationSpecProto> implements
+    ResourceLocalizationSpec {
+
+  private ResourceLocalizationSpecProto proto = ResourceLocalizationSpecProto
+    .getDefaultInstance();
+  private ResourceLocalizationSpecProto.Builder builder = null;
+  private boolean viaProto;
+  private LocalResource resource = null;
+  private URL destinationDirectory = null;
+
+  public ResourceLocalizationSpecPBImpl() {
+    builder = ResourceLocalizationSpecProto.newBuilder();
+  }
+
+  public ResourceLocalizationSpecPBImpl(ResourceLocalizationSpecProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  @Override
+  public LocalResource getResource() {
+    ResourceLocalizationSpecProtoOrBuilder p = viaProto ? proto : builder;
+    if (resource != null) {
+      return resource;
+    }
+    if (!p.hasResource()) {
+      return null;
+    }
+    resource = new LocalResourcePBImpl(p.getResource());
+    return resource;
+  }
+
+  @Override
+  public void setResource(LocalResource rsrc) {
+    maybeInitBuilder();
+    resource = rsrc;
+  }
+
+  @Override
+  public URL getDestinationDirectory() {
+    ResourceLocalizationSpecProtoOrBuilder p = viaProto ? proto : builder;
+    if (destinationDirectory != null) {
+      return destinationDirectory;
+    }
+    if (!p.hasDestinationDirectory()) {
+      return null;
+    }
+    destinationDirectory = new URLPBImpl(p.getDestinationDirectory());
+    return destinationDirectory;
+  }
+
+  @Override
+  public void setDestinationDirectory(URL destinationDirectory) {
+    maybeInitBuilder();
+    this.destinationDirectory = destinationDirectory;
+  }
+
+  @Override
+  public ResourceLocalizationSpecProto getProto() {
+    mergeLocalToBuilder();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (builder == null || viaProto) {
+      builder = ResourceLocalizationSpecProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void mergeLocalToBuilder() {
+    ResourceLocalizationSpecProtoOrBuilder l = viaProto ? proto : builder;
+    if (this.resource != null
+        && !(l.getResource()
+          .equals(((LocalResourcePBImpl) resource).getProto()))) {
+      maybeInitBuilder();
+      builder.setResource(((LocalResourcePBImpl) resource).getProto());
+    }
+    if (this.destinationDirectory != null
+        && !(l.getDestinationDirectory()
+          .equals(((URLPBImpl) destinationDirectory).getProto()))) {
+      maybeInitBuilder();
+      builder.setDestinationDirectory(((URLPBImpl) destinationDirectory)
+        .getProto());
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java Tue Apr  9 01:33:38 2013
@@ -18,18 +18,13 @@
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords;
 
 import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.api.*;
 
 public interface LocalizerHeartbeatResponse {
-  public LocalizerAction getLocalizerAction();
-  public List<LocalResource> getAllResources();
-  public LocalResource getLocalResource(int i);
 
+  public LocalizerAction getLocalizerAction();
   public void setLocalizerAction(LocalizerAction action);
 
-  public void addAllResources(List<LocalResource> resources);
-  public void addResource(LocalResource resource);
-  public void removeResource(int index);
-  public void clearResources();
-}
+  public List<ResourceLocalizationSpec> getResourceSpecs();
+  public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs);
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java Tue Apr  9 01:33:38 2013
@@ -21,13 +21,14 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.yarn.api.records.LocalResource;
+
 import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerActionProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.ResourceLocalizationSpecPBImpl;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 
@@ -40,13 +41,14 @@ public class LocalizerHeartbeatResponseP
   LocalizerHeartbeatResponseProto.Builder builder = null;
   boolean viaProto = false;
 
-  private List<LocalResource> resources;
+  private List<ResourceLocalizationSpec> resourceSpecs;
 
   public LocalizerHeartbeatResponsePBImpl() {
     builder = LocalizerHeartbeatResponseProto.newBuilder();
   }
 
-  public LocalizerHeartbeatResponsePBImpl(LocalizerHeartbeatResponseProto proto) {
+  public LocalizerHeartbeatResponsePBImpl(
+      LocalizerHeartbeatResponseProto proto) {
     this.proto = proto;
     viaProto = true;
   }
@@ -59,7 +61,7 @@ public class LocalizerHeartbeatResponseP
   }
 
   private void mergeLocalToBuilder() {
-    if (resources != null) {
+    if (resourceSpecs != null) {
       addResourcesToProto();
     }
   }
@@ -79,6 +81,7 @@ public class LocalizerHeartbeatResponseP
     viaProto = false;
   }
 
+  @Override
   public LocalizerAction getLocalizerAction() {
     LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     if (!p.hasAction()) {
@@ -87,14 +90,10 @@ public class LocalizerHeartbeatResponseP
     return convertFromProtoFormat(p.getAction());
   }
 
-  public List<LocalResource> getAllResources() {
-    initResources();
-    return this.resources;
-  }
-
-  public LocalResource getLocalResource(int i) {
+  @Override
+  public List<ResourceLocalizationSpec> getResourceSpecs() {
     initResources();
-    return this.resources.get(i);
+    return this.resourceSpecs;
   }
 
   public void setLocalizerAction(LocalizerAction action) {
@@ -106,31 +105,39 @@ public class LocalizerHeartbeatResponseP
     builder.setAction(convertToProtoFormat(action));
   }
 
+  public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs) {
+    maybeInitBuilder();
+    if (rsrcs == null) {
+      builder.clearResources();
+      return;
+    }
+    this.resourceSpecs = rsrcs;
+  }
+
   private void initResources() {
-    if (this.resources != null) {
+    if (this.resourceSpecs != null) {
       return;
     }
     LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<LocalResourceProto> list = p.getResourcesList();
-    this.resources = new ArrayList<LocalResource>();
-
-    for (LocalResourceProto c : list) {
-      this.resources.add(convertFromProtoFormat(c));
+    List<ResourceLocalizationSpecProto> list = p.getResourcesList();
+    this.resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
+    for (ResourceLocalizationSpecProto c : list) {
+      this.resourceSpecs.add(convertFromProtoFormat(c));
     }
   }
 
   private void addResourcesToProto() {
     maybeInitBuilder();
     builder.clearResources();
-    if (this.resources == null) 
+    if (this.resourceSpecs == null) 
       return;
-    Iterable<LocalResourceProto> iterable =
-        new Iterable<LocalResourceProto>() {
+    Iterable<ResourceLocalizationSpecProto> iterable =
+        new Iterable<ResourceLocalizationSpecProto>() {
       @Override
-      public Iterator<LocalResourceProto> iterator() {
-        return new Iterator<LocalResourceProto>() {
+      public Iterator<ResourceLocalizationSpecProto> iterator() {
+        return new Iterator<ResourceLocalizationSpecProto>() {
 
-          Iterator<LocalResource> iter = resources.iterator();
+          Iterator<ResourceLocalizationSpec> iter = resourceSpecs.iterator();
 
           @Override
           public boolean hasNext() {
@@ -138,8 +145,10 @@ public class LocalizerHeartbeatResponseP
           }
 
           @Override
-          public LocalResourceProto next() {
-            return convertToProtoFormat(iter.next());
+          public ResourceLocalizationSpecProto next() {
+            ResourceLocalizationSpec resource = iter.next();
+            
+            return ((ResourceLocalizationSpecPBImpl)resource).getProto();
           }
 
           @Override
@@ -154,34 +163,10 @@ public class LocalizerHeartbeatResponseP
     builder.addAllResources(iterable);
   }
 
-  public void addAllResources(List<LocalResource> resources) {
-    if (resources == null)
-      return;
-    initResources();
-    this.resources.addAll(resources);
-  }
 
-  public void addResource(LocalResource resource) {
-    initResources();
-    this.resources.add(resource);
-  }
-
-  public void removeResource(int index) {
-    initResources();
-    this.resources.remove(index);
-  }
-
-  public void clearResources() {
-    initResources();
-    this.resources.clear();
-  }
-
-  private LocalResource convertFromProtoFormat(LocalResourceProto p) {
-    return new LocalResourcePBImpl(p);
-  }
-
-  private LocalResourceProto convertToProtoFormat(LocalResource s) {
-    return ((LocalResourcePBImpl)s).getProto();
+  private ResourceLocalizationSpec convertFromProtoFormat(
+      ResourceLocalizationSpecProto p) {
+    return new ResourceLocalizationSpecPBImpl(p);
   }
 
   private LocalizerActionProto convertToProtoFormat(LocalizerAction a) {
@@ -191,5 +176,4 @@ public class LocalizerHeartbeatResponseP
   private LocalizerAction convertFromProtoFormat(LocalizerActionProto a) {
     return LocalizerAction.valueOf(a.name());
   }
-
-}
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Tue Apr  9 01:33:38 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
@@ -89,8 +91,6 @@ public class ContainerLocalizer {
   private final String localizerId;
   private final FileContext lfs;
   private final Configuration conf;
-  private final LocalDirAllocator appDirs;
-  private final LocalDirAllocator userDirs;
   private final RecordFactory recordFactory;
   private final Map<LocalResource,Future<Path>> pendingResources;
   private final String appCacheDirContextName;
@@ -112,8 +112,6 @@ public class ContainerLocalizer {
     this.recordFactory = recordFactory;
     this.conf = new Configuration();
     this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
-    this.appDirs = new LocalDirAllocator(appCacheDirContextName);
-    this.userDirs = new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, user));
     this.pendingResources = new HashMap<LocalResource,Future<Path>>();
   }
 
@@ -197,10 +195,10 @@ public class ContainerLocalizer {
     return new ExecutorCompletionService<Path>(exec);
   }
 
-  Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
+  Callable<Path> download(Path path, LocalResource rsrc,
       UserGroupInformation ugi) throws IOException {
-    Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
-    return new FSDownload(lfs, ugi, conf, destPath, rsrc, new Random());
+    DiskChecker.checkDir(new File(path.toUri().getRawPath()));
+    return new FSDownload(lfs, ugi, conf, path, rsrc, new Random());
   }
 
   static long getEstimatedSize(LocalResource rsrc) {
@@ -238,25 +236,12 @@ public class ContainerLocalizer {
         LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
         switch (response.getLocalizerAction()) {
         case LIVE:
-          List<LocalResource> newResources = response.getAllResources();
-          for (LocalResource r : newResources) {
-            if (!pendingResources.containsKey(r)) {
-              final LocalDirAllocator lda;
-              switch (r.getVisibility()) {
-              default:
-                LOG.warn("Unknown visibility: " + r.getVisibility()
-                        + ", Using userDirs");
-                //Falling back to userDirs for unknown visibility.
-              case PUBLIC:
-              case PRIVATE:
-                lda = userDirs;
-                break;
-              case APPLICATION:
-                lda = appDirs;
-                break;
-              }
-              // TODO: Synchronization??
-              pendingResources.put(r, cs.submit(download(lda, r, ugi)));
+          List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs();
+          for (ResourceLocalizationSpec newRsrc : newRsrcs) {
+            if (!pendingResources.containsKey(newRsrc.getResource())) {
+              pendingResources.put(newRsrc.getResource(), cs.submit(download(
+                new Path(newRsrc.getDestinationDirectory().getFile()),
+                newRsrc.getResource(), ugi)));
             }
           }
           break;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java Tue Apr  9 01:33:38 2013
@@ -22,8 +22,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Queue;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Apr  9 01:33:38 2013
@@ -80,10 +80,12 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@@ -105,6 +107,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -326,7 +329,7 @@ public class ResourceLocalizationService
     // 0) Create application tracking structs
     String userName = app.getUser();
     privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
-      dispatcher, false, super.getConfig()));
+      dispatcher, true, super.getConfig()));
     if (null != appRsrc.putIfAbsent(
       ConverterUtils.toString(app.getAppId()),
       new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
@@ -476,6 +479,21 @@ public class ResourceLocalizationService
     }
   }
 
+  private String getUserFileCachePath(String user) {
+    String path =
+        "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+            + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
+    return path;
+  }
+
+  private String getUserAppCachePath(String user, String appId) {
+    String path =
+        "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+            + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
+            + Path.SEPARATOR + appId;
+    return path;
+  }
+
   /**
    * Sub-component handling the spawning of {@link ContainerLocalizer}s
    */
@@ -803,7 +821,20 @@ public class ResourceLocalizationService
         LocalResource next = findNextResource();
         if (next != null) {
           response.setLocalizerAction(LocalizerAction.LIVE);
-          response.addResource(next);
+          try {
+            ArrayList<ResourceLocalizationSpec> rsrcs =
+                new ArrayList<ResourceLocalizationSpec>();
+            ResourceLocalizationSpec rsrc =
+                NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                  getPathForLocalization(next));
+            rsrcs.add(rsrc);
+            response.setResourceSpecs(rsrcs);
+          } catch (IOException e) {
+            LOG.error("local path for PRIVATE localization could not be found."
+                + "Disks might have failed.", e);
+          } catch (URISyntaxException e) {
+            // TODO fail? Already translated several times...
+          }
         } else if (pending.isEmpty()) {
           // TODO: Synchronization
           response.setLocalizerAction(LocalizerAction.DIE);
@@ -812,7 +843,8 @@ public class ResourceLocalizationService
         }
         return response;
       }
-
+      ArrayList<ResourceLocalizationSpec> rsrcs =
+          new ArrayList<ResourceLocalizationSpec>();
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
         LocalResourceRequest req = null;
@@ -835,6 +867,7 @@ public class ResourceLocalizationService
                   new ResourceLocalizedEvent(req,
                     ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
                     stat.getLocalSize()));
+              localizationCompleted(stat);
             } catch (URISyntaxException e) { }
             if (pending.isEmpty()) {
               // TODO: Synchronization
@@ -844,7 +877,17 @@ public class ResourceLocalizationService
             response.setLocalizerAction(LocalizerAction.LIVE);
             LocalResource next = findNextResource();
             if (next != null) {
-              response.addResource(next);
+              try {
+                ResourceLocalizationSpec resource =
+                    NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                      getPathForLocalization(next));
+                rsrcs.add(resource);
+              } catch (IOException e) {
+                LOG.error("local path for PRIVATE localization could not be " +
+                  "found. Disks might have failed.", e);
+              } catch (URISyntaxException e) {
+                  //TODO fail? Already translated several times...
+              }
             }
             break;
           case FETCH_PENDING:
@@ -854,6 +897,7 @@ public class ResourceLocalizationService
             LOG.info("DEBUG: FAILED " + req, stat.getException());
             assoc.getResource().unlock();
             response.setLocalizerAction(LocalizerAction.DIE);
+            localizationCompleted(stat);
             // TODO: Why is this event going directly to the container. Why not
             // the resource itself? What happens to the resource? Is it removed?
             dispatcher.getEventHandler().handle(
@@ -869,9 +913,53 @@ public class ResourceLocalizationService
             break;
         }
       }
+      response.setResourceSpecs(rsrcs);
       return response;
     }
 
+    private void localizationCompleted(LocalResourceStatus stat) {
+      try {
+        LocalResource rsrc = stat.getResource();
+        LocalResourceRequest key = new LocalResourceRequest(rsrc);
+        String user = context.getUser();
+        ApplicationId appId =
+            context.getContainerId().getApplicationAttemptId()
+              .getApplicationId();
+        LocalResourceVisibility vis = rsrc.getVisibility();
+        LocalResourcesTracker tracker =
+            getLocalResourcesTracker(vis, user, appId);
+        if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) {
+          tracker.localizationCompleted(key, true);
+        } else {
+          tracker.localizationCompleted(key, false);
+        }
+      } catch (URISyntaxException e) {
+        LOG.error("Invalid resource URL specified", e);
+      }
+    }
+
+    private Path getPathForLocalization(LocalResource rsrc) throws IOException,
+        URISyntaxException {
+      String user = context.getUser();
+      ApplicationId appId =
+          context.getContainerId().getApplicationAttemptId().getApplicationId();
+      LocalResourceVisibility vis = rsrc.getVisibility();
+      LocalResourcesTracker tracker =
+          getLocalResourcesTracker(vis, user, appId);
+      String cacheDirectory = null;
+      if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
+        cacheDirectory = getUserFileCachePath(user);
+      } else {// APPLICATION ONLY
+        cacheDirectory = getUserAppCachePath(user, appId.toString());
+      }
+      Path dirPath =
+          dirsHandler.getLocalPathForWrite(cacheDirectory,
+            ContainerLocalizer.getEstimatedSize(rsrc), false);
+      return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+        dirPath);
+
+    }
+
     @Override
     @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
@@ -1033,4 +1121,4 @@ public class ResourceLocalizationService
     del.delete(null, dirPath, new Path[] {});
   }
 
-}
\ No newline at end of file
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java?rev=1465853&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java Tue Apr  9 01:33:38 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.util;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+public class NodeManagerBuilderUtils {
+
+  public static ResourceLocalizationSpec newResourceLocalizationSpec(
+      LocalResource rsrc, Path path) {
+    URL local = ConverterUtils.getYarnUrlFromPath(path);
+    ResourceLocalizationSpec resourceLocalizationSpec =
+        Records.newRecord(ResourceLocalizationSpec.class);
+    resourceLocalizationSpec.setDestinationDirectory(local);
+    resourceLocalizationSpec.setResource(rsrc);
+    return resourceLocalizationSpec;
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto Tue Apr  9 01:33:38 2013
@@ -47,7 +47,12 @@ enum LocalizerActionProto {
   DIE = 2;
 }
 
+message ResourceLocalizationSpecProto {
+  optional LocalResourceProto resource = 1;
+  optional URLProto destination_directory = 2;
+}
+
 message LocalizerHeartbeatResponseProto {
   optional LocalizerActionProto action = 1;
-  repeated LocalResourceProto resources = 2;
+  repeated ResourceLocalizationSpecProto resources = 2;
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java Tue Apr  9 01:33:38 2013
@@ -17,6 +17,13 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -31,15 +38,14 @@ import org.apache.hadoop.yarn.ipc.RPCUti
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 public class TestPBRecordImpl {
 
@@ -54,9 +60,8 @@ public class TestPBRecordImpl {
   static LocalResource createResource() {
     LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
     assertTrue(ret instanceof LocalResourcePBImpl);
-    ret.setResource(
-        ConverterUtils.getYarnUrlFromPath(
-          new Path("hdfs://y.ak:8020/foo/bar")));
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(
+      "hdfs://y.ak:8020/foo/bar")));
     ret.setSize(4344L);
     ret.setTimestamp(3141592653589793L);
     ret.setVisibility(LocalResourceVisibility.PUBLIC);
@@ -90,16 +95,27 @@ public class TestPBRecordImpl {
     return ret;
   }
 
-  static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() {
+  static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() 
+      throws URISyntaxException {
     LocalizerHeartbeatResponse ret =
       recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
     assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl);
     ret.setLocalizerAction(LocalizerAction.LIVE);
-    ret.addResource(createResource());
+    LocalResource rsrc = createResource();
+    ArrayList<ResourceLocalizationSpec> rsrcs =
+      new ArrayList<ResourceLocalizationSpec>();
+    ResourceLocalizationSpec resource =
+      recordFactory.newRecordInstance(ResourceLocalizationSpec.class);
+    resource.setResource(rsrc);
+    resource.setDestinationDirectory(ConverterUtils
+      .getYarnUrlFromPath(new Path("/tmp" + System.currentTimeMillis())));
+    rsrcs.add(resource);
+    ret.setResourceSpecs(rsrcs);
+    System.out.println(resource);
     return ret;
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalResourceStatusSerDe() throws Exception {
     LocalResourceStatus rsrcS = createLocalResourceStatus();
     assertTrue(rsrcS instanceof LocalResourceStatusPBImpl);
@@ -119,7 +135,7 @@ public class TestPBRecordImpl {
     assertEquals(createResource(), rsrcD.getResource());
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalizerStatusSerDe() throws Exception {
     LocalizerStatus rsrcS = createLocalizerStatus();
     assertTrue(rsrcS instanceof LocalizerStatusPBImpl);
@@ -141,7 +157,7 @@ public class TestPBRecordImpl {
     assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0));
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalizerHeartbeatResponseSerDe() throws Exception {
     LocalizerHeartbeatResponse rsrcS = createLocalizerHeartbeatResponse();
     assertTrue(rsrcS instanceof LocalizerHeartbeatResponsePBImpl);
@@ -158,8 +174,8 @@ public class TestPBRecordImpl {
       new LocalizerHeartbeatResponsePBImpl(rsrcPbD);
 
     assertEquals(rsrcS, rsrcD);
-    assertEquals(createResource(), rsrcS.getLocalResource(0));
-    assertEquals(createResource(), rsrcD.getLocalResource(0));
+    assertEquals(createResource(), rsrcS.getResourceSpecs().get(0).getResource());
+    assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource());
   }
 
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java Tue Apr  9 01:33:38 2013
@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.no
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 
@@ -28,28 +28,30 @@ public class MockLocalizerHeartbeatRespo
     implements LocalizerHeartbeatResponse {
 
   LocalizerAction action;
-  List<LocalResource> rsrc;
+  List<ResourceLocalizationSpec> resourceSpecs;
 
   MockLocalizerHeartbeatResponse() {
-    rsrc = new ArrayList<LocalResource>();
+    resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
   }
 
   MockLocalizerHeartbeatResponse(
-      LocalizerAction action, List<LocalResource> rsrc) {
+      LocalizerAction action, List<ResourceLocalizationSpec> resources) {
     this.action = action;
-    this.rsrc = rsrc;
+    this.resourceSpecs = resources;
   }
 
   public LocalizerAction getLocalizerAction() { return action; }
-  public List<LocalResource> getAllResources() { return rsrc; }
-  public LocalResource getLocalResource(int i) { return rsrc.get(i); }
   public void setLocalizerAction(LocalizerAction action) {
     this.action = action;
   }
-  public void addAllResources(List<LocalResource> resources) {
-    rsrc.addAll(resources);
+
+  @Override
+  public List<ResourceLocalizationSpec> getResourceSpecs() {
+    return resourceSpecs;
+}
+
+  @Override
+  public void setResourceSpecs(List<ResourceLocalizationSpec> resourceSpecs) {
+    this.resourceSpecs = resourceSpecs;
   }
-  public void addResource(LocalResource resource) { rsrc.add(resource); }
-  public void removeResource(int index) { rsrc.remove(index); }
-  public void clearResources() { rsrc.clear(); }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Tue Apr  9 01:33:38 2013
@@ -50,7 +50,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -66,9 +65,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
@@ -95,12 +96,33 @@ public class TestContainerLocalizer {
   public void testContainerLocalizerMain() throws Exception {
     ContainerLocalizer localizer = setupContainerLocalizerForTest();
 
+    // verify created cache
+    List<Path> privCacheList = new ArrayList<Path>();
+    List<Path> appCacheList = new ArrayList<Path>();
+    for (Path p : localDirs) {
+      Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
+      Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
+      privCacheList.add(privcache);
+      Path appDir =
+          new Path(base, new Path(ContainerLocalizer.APPCACHE, appId));
+      Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
+      appCacheList.add(appcache);
+    }
+
     // mock heartbeat responses from NM
-    LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
-    LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
-    LocalResource rsrcC = getMockRsrc(random,
-        LocalResourceVisibility.APPLICATION);
-    LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
+    ResourceLocalizationSpec rsrcA =
+        getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+          privCacheList.get(0));
+    ResourceLocalizationSpec rsrcB =
+        getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+          privCacheList.get(0));
+    ResourceLocalizationSpec rsrcC =
+        getMockRsrc(random, LocalResourceVisibility.APPLICATION,
+          appCacheList.get(0));
+    ResourceLocalizationSpec rsrcD =
+        getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+          privCacheList.get(0));
+    
     when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
             Collections.singletonList(rsrcA)))
@@ -111,27 +133,33 @@ public class TestContainerLocalizer {
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
             Collections.singletonList(rsrcD)))
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
-            Collections.<LocalResource>emptyList()))
+            Collections.<ResourceLocalizationSpec>emptyList()))
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
             null));
 
-    doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
-        localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
+    LocalResource tRsrcA = rsrcA.getResource();
+    LocalResource tRsrcB = rsrcB.getResource();
+    LocalResource tRsrcC = rsrcC.getResource();
+    LocalResource tRsrcD = rsrcD.getResource();
+    doReturn(
+      new FakeDownload(rsrcA.getResource().getResource().getFile(), true))
+      .when(localizer).download(isA(Path.class), eq(tRsrcA),
         isA(UserGroupInformation.class));
-    doReturn(new FakeDownload(rsrcB.getResource().getFile(), true)).when(
-        localizer).download(isA(LocalDirAllocator.class), eq(rsrcB),
+    doReturn(
+      new FakeDownload(rsrcB.getResource().getResource().getFile(), true))
+      .when(localizer).download(isA(Path.class), eq(tRsrcB),
         isA(UserGroupInformation.class));
-    doReturn(new FakeDownload(rsrcC.getResource().getFile(), true)).when(
-        localizer).download(isA(LocalDirAllocator.class), eq(rsrcC),
+    doReturn(
+      new FakeDownload(rsrcC.getResource().getResource().getFile(), true))
+      .when(localizer).download(isA(Path.class), eq(tRsrcC),
         isA(UserGroupInformation.class));
-    doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
-        localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
+    doReturn(
+      new FakeDownload(rsrcD.getResource().getResource().getFile(), true))
+      .when(localizer).download(isA(Path.class), eq(tRsrcD),
         isA(UserGroupInformation.class));
 
     // run localization
     assertEquals(0, localizer.runLocalization(nmAddr));
-
-    // verify created cache
     for (Path p : localDirs) {
       Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
       Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
@@ -143,15 +171,14 @@ public class TestContainerLocalizer {
       Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
       verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(false));
     }
-
     // verify tokens read at expected location
     verify(spylfs).open(tokenPath);
 
     // verify downloaded resources reported to NM
-    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA)));
-    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB)));
-    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC)));
-    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD)));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA.getResource())));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB.getResource())));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC.getResource())));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD.getResource())));
 
     // verify all HB use localizerID provided
     verify(nmProxy, never()).heartbeat(argThat(
@@ -306,10 +333,12 @@ public class TestContainerLocalizer {
     return mockRF;
   }
 
-  static LocalResource getMockRsrc(Random r,
-      LocalResourceVisibility vis) {
-    LocalResource rsrc = mock(LocalResource.class);
+  static ResourceLocalizationSpec getMockRsrc(Random r,
+      LocalResourceVisibility vis, Path p) {
+    ResourceLocalizationSpec resourceLocalizationSpec =
+      mock(ResourceLocalizationSpec.class);
 
+    LocalResource rsrc = mock(LocalResource.class);
     String name = Long.toHexString(r.nextLong());
     URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
     when(uri.getScheme()).thenReturn("file");
@@ -322,7 +351,10 @@ public class TestContainerLocalizer {
     when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
     when(rsrc.getVisibility()).thenReturn(vis);
 
-    return rsrc;
+    when(resourceLocalizationSpec.getResource()).thenReturn(rsrc);
+    when(resourceLocalizationSpec.getDestinationDirectory()).
+      thenReturn(ConverterUtils.getYarnUrlFromPath(p));
+    return resourceLocalizationSpec;
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1465853&r1=1465852&r2=1465853&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Tue Apr  9 01:33:38 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
@@ -35,6 +36,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -375,7 +377,7 @@ public class TestResourceLocalizationSer
     }
   }
   
-  @Test
+  @Test( timeout = 10000)
   @SuppressWarnings("unchecked") // mocked generics
   public void testLocalizationHeartbeat() throws Exception {
     Configuration conf = new YarnConfiguration();
@@ -386,12 +388,17 @@ public class TestResourceLocalizationSer
         isA(Path.class), isA(FsPermission.class), anyBoolean());
 
     List<Path> localDirs = new ArrayList<Path>();
-    String[] sDirs = new String[4];
-    for (int i = 0; i < 4; ++i) {
-      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
-      sDirs[i] = localDirs.get(i).toString();
-    }
+    String[] sDirs = new String[1];
+    // Making sure that we have only one local disk so that it will only be
+    // selected for consecutive resource localization calls.  This is required
+    // to test LocalCacheDirectoryManager.
+    localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
+    sDirs[0] = localDirs.get(0).toString();
+
     conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    // Adding configuration to make sure there is only one file per
+    // directory
+    conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
     String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
     DrainDispatcher dispatcher = new DrainDispatcher();
@@ -452,12 +459,23 @@ public class TestResourceLocalizationSer
       doReturn(out).when(spylfs).createInternal(isA(Path.class),
           isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
           anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
-      final LocalResource resource = getPrivateMockedResource(r);
-      final LocalResourceRequest req = new LocalResourceRequest(resource);
+      final LocalResource resource1 = getPrivateMockedResource(r);
+      LocalResource resource2 = null;
+      do {
+        resource2 = getPrivateMockedResource(r);
+      } while (resource2 == null || resource2.equals(resource1));
+      // above call to make sure we don't get identical resources.
+      
+      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+      final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
         new HashMap<LocalResourceVisibility, 
                     Collection<LocalResourceRequest>>();
-      rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
+      List<LocalResourceRequest> privateResourceList =
+          new ArrayList<LocalResourceRequest>();
+      privateResourceList.add(req1);
+      privateResourceList.add(req2);
+      rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
       spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
       // Sigh. Thread init of private localizer not accessible
       Thread.sleep(1000);
@@ -471,33 +489,64 @@ public class TestResourceLocalizationSer
       Path localizationTokenPath = tokenPathCaptor.getValue();
 
       // heartbeat from localizer
-      LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class);
       LocalizerStatus stat = mock(LocalizerStatus.class);
       when(stat.getLocalizerId()).thenReturn(ctnrStr);
-      when(rsrcStat.getResource()).thenReturn(resource);
-      when(rsrcStat.getLocalSize()).thenReturn(4344L);
+      when(rsrcStat1.getResource()).thenReturn(resource1);
+      when(rsrcStat2.getResource()).thenReturn(resource2);
+      when(rsrcStat1.getLocalSize()).thenReturn(4344L);
+      when(rsrcStat2.getLocalSize()).thenReturn(2342L);
       URL locPath = getPath("/cache/private/blah");
-      when(rsrcStat.getLocalPath()).thenReturn(locPath);
-      when(rsrcStat.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrcStat1.getLocalPath()).thenReturn(locPath);
+      when(rsrcStat2.getLocalPath()).thenReturn(locPath);
+      when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
       when(stat.getResources())
         .thenReturn(Collections.<LocalResourceStatus>emptyList())
-        .thenReturn(Collections.singletonList(rsrcStat))
+        .thenReturn(Collections.singletonList(rsrcStat1))
+        .thenReturn(Collections.singletonList(rsrcStat2))
         .thenReturn(Collections.<LocalResourceStatus>emptyList());
 
-      // get rsrc
+      String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
+          Path.SEPARATOR + "user0" + Path.SEPARATOR +
+          ContainerLocalizer.FILECACHE;
+      
+      // get first resource
       LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
-      assertEquals(req, new LocalResourceRequest(response.getLocalResource(0)));
+      assertEquals(1, response.getResourceSpecs().size());
+      assertEquals(req1,
+        new LocalResourceRequest(response.getResourceSpecs().get(0).getResource()));
+      URL localizedPath =
+          response.getResourceSpecs().get(0).getDestinationDirectory();
+      assertTrue(localizedPath.getFile().endsWith(localPath));
+
+      // get second resource
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+      assertEquals(1, response.getResourceSpecs().size());
+      assertEquals(req2, new LocalResourceRequest(response.getResourceSpecs()
+        .get(0).getResource()));
+      localizedPath =
+          response.getResourceSpecs().get(0).getDestinationDirectory();
+      // Resource's destination path should be now inside sub directory 0 as
+      // LocalCacheDirectoryManager will be used and we have restricted number
+      // of files per directory to 1.
+      assertTrue(localizedPath.getFile().endsWith(
+        localPath + Path.SEPARATOR + "0"));
 
       // empty rsrc
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
-      assertEquals(0, response.getAllResources().size());
+      assertEquals(0, response.getResourceSpecs().size());
 
       // get shutdown
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
 
+
+      dispatcher.await();
       // verify container notification
       ArgumentMatcher<ContainerEvent> matchesContainerLoc =
         new ArgumentMatcher<ContainerEvent>() {
@@ -508,9 +557,9 @@ public class TestResourceLocalizationSer
               && c.getContainerID() == evt.getContainerID();
           }
         };
-      dispatcher.await();
-      verify(containerBus).handle(argThat(matchesContainerLoc));
-      
+      // total 2 resource localzation calls. one for each resource.
+      verify(containerBus, times(2)).handle(argThat(matchesContainerLoc));
+        
       // Verify deletion of localization token.
       verify(delService).delete((String)isNull(), eq(localizationTokenPath));
     } finally {