You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/09/13 08:29:21 UTC

[07/50] [abbrv] hadoop git commit: YARN-5576. Allow resource localization while container is running. Contributed by Jian He.

YARN-5576. Allow resource localization while container is running. Contributed by Jian He.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6fcfe28
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6fcfe28
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6fcfe28

Branch: refs/heads/HDFS-10285
Commit: e6fcfe28e304062c7d09231db757acb2953703ce
Parents: 62a9667
Author: Varun Vasudev <vv...@apache.org>
Authored: Tue Sep 6 20:01:45 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Tue Sep 6 20:01:45 2016 +0530

----------------------------------------------------------------------
 .../server/nodemanager/ContainerExecutor.java   |   9 +
 .../hadoop/yarn/server/nodemanager/Context.java |   2 +
 .../nodemanager/DefaultContainerExecutor.java   |   6 +
 .../nodemanager/DockerContainerExecutor.java    |   6 +
 .../nodemanager/LinuxContainerExecutor.java     |   5 +
 .../yarn/server/nodemanager/NodeManager.java    |  12 ++
 .../containermanager/ContainerManagerImpl.java  |  70 +++++--
 .../containermanager/container/Container.java   |   8 +-
 .../container/ContainerImpl.java                | 207 ++++++++-----------
 .../launcher/ContainerLaunch.java               |   4 +-
 .../localizer/ResourceLocalizationService.java  |  21 +-
 .../containermanager/localizer/ResourceSet.java | 192 +++++++++++++++++
 .../ContainerLocalizationRequestEvent.java      |   2 +-
 .../localizer/event/LocalizationEventType.java  |   2 +-
 .../timelineservice/NMTimelinePublisher.java    |   2 +-
 .../nodemanager/DummyContainerManager.java      |   2 +-
 .../TestContainerManagerWithLCE.java            |  13 +-
 .../amrmproxy/BaseAMRMProxyTest.java            |   6 +
 .../containermanager/TestContainerManager.java  | 137 +++++++++++-
 .../TestResourceLocalizationService.java        |   7 +-
 .../TestContainersMonitorResourceChange.java    |   7 +
 .../nodemanager/webapp/MockContainer.java       |  17 +-
 22 files changed, 577 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 818b0ea..918c30a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -182,6 +182,15 @@ public abstract class ContainerExecutor implements Configurable {
       throws IOException, InterruptedException;
 
   /**
+   * Create a symlink file which points to the target.
+   * @param target The target for symlink
+   * @param symlink the symlink file
+   * @throws IOException Error when creating symlinks
+   */
+  public abstract void symLink(String target, String symlink)
+      throws IOException;
+
+  /**
    * Check if a container is alive.
    * @param ctx Encapsulates information necessary for container liveness check.
    * @return true if container is still alive

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 88bc29c..e888393 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -126,4 +126,6 @@ public interface Context {
   void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher);
 
   NMTimelinePublisher getNMTimelinePublisher();
+
+  ContainerExecutor getContainerExecutor();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index 13ad9ac..9a0549d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -511,6 +512,11 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     }
   }
 
+  @Override
+  public void symLink(String target, String symlink) throws IOException {
+    FileUtil.symLink(target, symlink);
+  }
+
   /** Permissions for user dir.
    * $local.dir/usercache/$user */
   static final short USER_PERM = (short)0750;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
index ebf9566..1390214 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
@@ -490,6 +490,12 @@ public class DockerContainerExecutor extends ContainerExecutor {
     }
   }
 
+  @Override
+  public void symLink(String target, String symlink)
+      throws IOException {
+
+  }
+
   /**
    * Converts a directory list to a docker mount string
    * @param dirs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index 6890b25..cc12b20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -683,6 +683,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
   }
 
   @Override
+  public void symLink(String target, String symlink) {
+
+  }
+
+  @Override
   public boolean isContainerAlive(ContainerLivenessContext ctx)
       throws IOException {
     String user = ctx.getUser();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 280a086..7f13334 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -343,6 +343,9 @@ public class NodeManager extends CompositeService
     this.context = createNMContext(containerTokenSecretManager,
         nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf);
 
+
+    ((NMContext)context).setContainerExecutor(exec);
+
     nodeLabelsProvider = createNodeLabelsProvider(conf);
 
     if (null == nodeLabelsProvider) {
@@ -509,6 +512,7 @@ public class NodeManager extends CompositeService
     private OpportunisticContainerAllocator containerAllocator;
 
     private final QueuingContext queuingContext;
+    private ContainerExecutor executor;
 
     private NMTimelinePublisher nmTimelinePublisher;
 
@@ -702,6 +706,14 @@ public class NodeManager extends CompositeService
     public NMTimelinePublisher getNMTimelinePublisher() {
       return nmTimelinePublisher;
     }
+
+    public ContainerExecutor getContainerExecutor() {
+      return this.executor;
+    }
+
+    public void setContainerExecutor(ContainerExecutor executor) {
+      this.executor = executor;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 8c060bc..52d8566 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -18,25 +18,8 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 
-import static org.apache.hadoop.service.Service.STATE.STARTED;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -76,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -129,7 +113,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
@@ -154,8 +140,25 @@ import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import static org.apache.hadoop.service.Service.STATE.STARTED;
 
 public class ContainerManagerImpl extends CompositeService implements
     ContainerManager {
@@ -1525,6 +1528,31 @@ public class ContainerManagerImpl extends CompositeService implements
   public ResourceLocalizationResponse localize(
       ResourceLocalizationRequest request) throws YarnException, IOException {
 
+    ContainerId containerId = request.getContainerId();
+    Container container = context.getContainers().get(containerId);
+    if (container == null) {
+      throw new YarnException("Specified " + containerId + " does not exist!");
+    }
+    if (!container.getContainerState()
+        .equals(org.apache.hadoop.yarn.server.nodemanager.
+            containermanager.container.ContainerState.RUNNING)) {
+      throw new YarnException(
+          containerId + " is at " + container.getContainerState()
+              + " state. Not able to localize new resources.");
+    }
+
+    try {
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+          container.getResourceSet().addResources(request.getLocalResources());
+      if (req != null && !req.isEmpty()) {
+        dispatcher.getEventHandler()
+            .handle(new ContainerLocalizationRequestEvent(container, req));
+      }
+    } catch (URISyntaxException e) {
+      LOG.info("Error when parsing local resource URI for " + containerId, e);
+      throw new YarnException(e);
+    }
+
     return ResourceLocalizationResponse.newInstance();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 84d3cb2..c4cea18 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
-import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -31,6 +28,10 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
+
+import java.util.List;
+import java.util.Map;
 
 public interface Container extends EventHandler<ContainerEvent> {
 
@@ -74,4 +75,5 @@ public interface Container extends EventHandler<ContainerEvent> {
 
   Priority getPriority();
 
+  ResourceSet getResourceSet();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 0244d90..ce9e581 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -18,18 +18,15 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -65,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
@@ -124,20 +122,7 @@ public class ContainerImpl implements Container {
   private final Configuration daemonConf;
 
   private static final Log LOG = LogFactory.getLog(ContainerImpl.class);
-  private final Map<LocalResourceRequest,List<String>> pendingResources =
-    new HashMap<LocalResourceRequest,List<String>>();
-  private final Map<Path,List<String>> localizedResources =
-    new HashMap<Path,List<String>>();
-  private final List<LocalResourceRequest> publicRsrcs =
-    new ArrayList<LocalResourceRequest>();
-  private final List<LocalResourceRequest> privateRsrcs =
-    new ArrayList<LocalResourceRequest>();
-  private final List<LocalResourceRequest> appRsrcs =
-    new ArrayList<LocalResourceRequest>();
-  private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
-      new ConcurrentHashMap<LocalResourceRequest, Path>();
-  private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
-      new ConcurrentHashMap<LocalResourceRequest, Boolean>();
+
 
   // whether container has been recovered after a restart
   private RecoveredContainerStatus recoveredStatus =
@@ -145,6 +130,7 @@ public class ContainerImpl implements Container {
   // whether container was marked as killed after recovery
   private boolean recoveredAsKilled = false;
   private Context context;
+  private ResourceSet resourceSet;
 
   public ContainerImpl(Configuration conf, Dispatcher dispatcher,
       ContainerLaunchContext launchContext, Credentials creds,
@@ -204,6 +190,7 @@ public class ContainerImpl implements Container {
 
     stateMachine = stateMachineFactory.make(this);
     this.context = context;
+    this.resourceSet = new ResourceSet();
   }
 
   // constructor for a recovered container
@@ -313,6 +300,12 @@ public class ContainerImpl implements Container {
         ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
         new RetryFailureTransition())
     .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        new ResourceLocalizedWhileRunningTransition())
+    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+        ContainerEventType.RESOURCE_FAILED,
+        new ResourceLocalizationFailedWhileRunningTransition())
+    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.RUNNING, ContainerState.KILLING,
@@ -470,7 +463,7 @@ public class ContainerImpl implements Container {
     try {
       if (ContainerState.LOCALIZED == getContainerState()
           || ContainerState.RELAUNCHING == getContainerState()) {
-        return localizedResources;
+        return resourceSet.getLocalizedResources();
       } else {
         return null;
       }
@@ -591,6 +584,11 @@ public class ContainerImpl implements Container {
     this.logDir = logDir;
   }
 
+  @Override
+  public ResourceSet getResourceSet() {
+    return this.resourceSet;
+  }
+
   @SuppressWarnings("unchecked")
   private void sendFinishedEvents() {
     // Inform the application
@@ -653,7 +651,7 @@ public class ContainerImpl implements Container {
     for (String s : diags) {
       this.diagnostics.append(s);
     }
-    if (isRetryContextSet() && diagnostics.length() > diagnosticsMaxSize) {
+    if (diagnostics.length() > diagnosticsMaxSize) {
       diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
     }
     try {
@@ -667,17 +665,7 @@ public class ContainerImpl implements Container {
   @SuppressWarnings("unchecked") // dispatcher not typed
   public void cleanup() {
     Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
-      new HashMap<LocalResourceVisibility, 
-                  Collection<LocalResourceRequest>>();
-    if (!publicRsrcs.isEmpty()) {
-      rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
-    }
-    if (!privateRsrcs.isEmpty()) {
-      rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
-    }
-    if (!appRsrcs.isEmpty()) {
-      rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
-    }
+        resourceSet.getAllResourcesByVisibility();
     dispatcher.getEventHandler().handle(
         new ContainerLocalizationCleanupEvent(this, rsrc));
   }
@@ -697,7 +685,7 @@ public class ContainerImpl implements Container {
    * message.
    * 
    * If there are resources to localize, sends a
-   * ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES) 
+   * ContainerLocalizationRequest (LOCALIZE_CONTAINER_RESOURCES)
    * to the ResourceLocalizationManager and enters LOCALIZING state.
    * 
    * If there are no resources to localize, sends LAUNCH_CONTAINER event
@@ -749,39 +737,15 @@ public class ContainerImpl implements Container {
       }
 
       container.containerLocalizationStartTime = clock.getTime();
+
       // Send requests for public, private resources
       Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
       if (!cntrRsrc.isEmpty()) {
         try {
-          for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
-            try {
-              LocalResourceRequest req =
-                  new LocalResourceRequest(rsrc.getValue());
-              List<String> links = container.pendingResources.get(req);
-              if (links == null) {
-                links = new ArrayList<String>();
-                container.pendingResources.put(req, links);
-              }
-              links.add(rsrc.getKey());
-              storeSharedCacheUploadPolicy(container, req, rsrc.getValue()
-                  .getShouldBeUploadedToSharedCache());
-              switch (rsrc.getValue().getVisibility()) {
-              case PUBLIC:
-                container.publicRsrcs.add(req);
-                break;
-              case PRIVATE:
-                container.privateRsrcs.add(req);
-                break;
-              case APPLICATION:
-                container.appRsrcs.add(req);
-                break;
-              }
-            } catch (URISyntaxException e) {
-              LOG.info("Got exception parsing " + rsrc.getKey()
-                  + " and value " + rsrc.getValue());
-              throw e;
-            }
-          }
+          Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+              container.resourceSet.addResources(ctxt.getLocalResources());
+          container.dispatcher.getEventHandler().handle(
+              new ContainerLocalizationRequestEvent(container, req));
         } catch (URISyntaxException e) {
           // malformed resource; abort container launch
           LOG.warn("Failed to parse resource-request", e);
@@ -789,21 +753,6 @@ public class ContainerImpl implements Container {
           container.metrics.endInitingContainer();
           return ContainerState.LOCALIZATION_FAILED;
         }
-        Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
-            new LinkedHashMap<LocalResourceVisibility,
-                        Collection<LocalResourceRequest>>();
-        if (!container.publicRsrcs.isEmpty()) {
-          req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
-        }
-        if (!container.privateRsrcs.isEmpty()) {
-          req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
-        }
-        if (!container.appRsrcs.isEmpty()) {
-          req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
-        }
-        
-        container.dispatcher.getEventHandler().handle(
-              new ContainerLocalizationRequestEvent(container, req));
         return ContainerState.LOCALIZING;
       } else {
         container.sendLaunchEvent();
@@ -814,27 +763,6 @@ public class ContainerImpl implements Container {
   }
 
   /**
-   * Store the resource's shared cache upload policies
-   * Given LocalResourceRequest can be shared across containers in
-   * LocalResourcesTrackerImpl, we preserve the upload policies here.
-   * In addition, it is possible for the application to create several
-   * "identical" LocalResources as part of
-   * ContainerLaunchContext.setLocalResources with different symlinks.
-   * There is a corner case where these "identical" local resources have
-   * different upload policies. For that scenario, upload policy will be set to
-   * true as long as there is at least one LocalResource entry with
-   * upload policy set to true.
-   */
-  private static void storeSharedCacheUploadPolicy(ContainerImpl container,
-      LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
-    Boolean storedUploadPolicy =
-        container.resourcesUploadPolicies.get(resourceRequest);
-    if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
-      container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
-    }
-  }
-
-  /**
    * Transition when one of the requested resources for this container
    * has been successfully localized.
    */
@@ -847,22 +775,21 @@ public class ContainerImpl implements Container {
       ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
       LocalResourceRequest resourceRequest = rsrcEvent.getResource();
       Path location = rsrcEvent.getLocation();
-      List<String> syms = container.pendingResources.remove(resourceRequest);
+      List<String> syms =
+          container.resourceSet.resourceLocalized(resourceRequest, location);
       if (null == syms) {
-        LOG.warn("Localized unknown resource " + resourceRequest +
-                 " for container " + container.containerId);
-        assert false;
-        // fail container?
+        LOG.info("Localized resource " + resourceRequest +
+            " for container " + container.containerId);
         return ContainerState.LOCALIZING;
       }
-      container.localizedResources.put(location, syms);
 
       // check to see if this resource should be uploaded to the shared cache
       // as well
       if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
-        container.resourcesToBeUploaded.put(resourceRequest, location);
+        container.resourceSet.getResourcesToBeUploaded()
+            .put(resourceRequest, location);
       }
-      if (!container.pendingResources.isEmpty()) {
+      if (!container.resourceSet.getPendingResources().isEmpty()) {
         return ContainerState.LOCALIZING;
       }
 
@@ -884,7 +811,8 @@ public class ContainerImpl implements Container {
           && container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
         // kick off uploads to the shared cache
         container.dispatcher.getEventHandler().handle(
-            new SharedCacheUploadEvent(container.resourcesToBeUploaded, container
+            new SharedCacheUploadEvent(
+                container.resourceSet.getResourcesToBeUploaded(), container
                 .getLaunchContext(), container.getUser(),
                 SharedCacheUploadEventType.UPLOAD));
       }
@@ -894,6 +822,56 @@ public class ContainerImpl implements Container {
   }
 
   /**
+   * Resource is localized while the container is running - create symlinks
+   */
+  static class ResourceLocalizedWhileRunningTransition
+      extends ContainerTransition {
+
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      ContainerResourceLocalizedEvent rsrcEvent =
+          (ContainerResourceLocalizedEvent) event;
+      List<String> links = container.resourceSet
+          .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
+      // creating symlinks.
+      for (String link : links) {
+        try {
+          String linkFile = new Path(container.workDir, link).toString();
+          if (new File(linkFile).exists()) {
+            LOG.info("Symlink file already exists: " + linkFile);
+          } else {
+            container.context.getContainerExecutor()
+                .symLink(rsrcEvent.getLocation().toString(), linkFile);
+            LOG.info("Created symlink: " + linkFile + " -> " + rsrcEvent
+                .getLocation());
+          }
+        } catch (IOException e) {
+          String message = String
+              .format("Error when creating symlink %s -> %s", link,
+                  rsrcEvent.getLocation());
+          LOG.error(message, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Resource localization failed while the container is running.
+   */
+  static class ResourceLocalizationFailedWhileRunningTransition
+      extends ContainerTransition {
+
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      ContainerResourceFailedEvent failedEvent =
+          (ContainerResourceFailedEvent) event;
+      container.resourceSet
+          .resourceLocalizationFailed(failedEvent.getResource());
+      container.addDiagnostics(failedEvent.getDiagnosticMessage());
+    }
+  }
+
+  /**
    * Transition from LOCALIZED state to RUNNING state upon receiving
    * a CONTAINER_LAUNCHED event
    */
@@ -1136,17 +1114,10 @@ public class ContainerImpl implements Container {
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
-      ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
-      List<String> syms =
-          container.pendingResources.remove(rsrcEvent.getResource());
-      if (null == syms) {
-        LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
-                 " for container " + container.containerId);
-        assert false;
-        // fail container?
-        return;
-      }
-      container.localizedResources.put(rsrcEvent.getLocation(), syms);
+      ContainerResourceLocalizedEvent rsrcEvent =
+          (ContainerResourceLocalizedEvent) event;
+      container.resourceSet
+          .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
     }
   }
 
@@ -1402,7 +1373,7 @@ public class ContainerImpl implements Container {
    */
   private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
       LocalResourceRequest resource) {
-    return container.resourcesUploadPolicies.get(resource);
+    return container.resourceSet.getResourcesUploadPolicies().get(resource);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 14190fc..d8239ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -1201,16 +1201,16 @@ public class ContainerLaunch implements Callable<Integer> {
 
   private void recordContainerLogDir(ContainerId containerId,
       String logDir) throws IOException{
+    container.setLogDir(logDir);
     if (container.isRetryContextSet()) {
-      container.setLogDir(logDir);
       context.getNMStateStore().storeContainerLogDir(containerId, logDir);
     }
   }
 
   private void recordContainerWorkDir(ContainerId containerId,
       String workDir) throws IOException{
+    container.setWorkDir(workDir);
     if (container.isRetryContextSet()) {
-      container.setWorkDir(workDir);
       context.getNMStateStore().storeContainerWorkDir(containerId, workDir);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 0ec2f29..b281ef5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
@@ -135,7 +136,6 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.FSDownload;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -419,7 +419,7 @@ public class ResourceLocalizationService extends CompositeService
       handleInitApplicationResources(
           ((ApplicationLocalizationEvent)event).getApplication());
       break;
-    case INIT_CONTAINER_RESOURCES:
+    case LOCALIZE_CONTAINER_RESOURCES:
       handleInitContainerResources((ContainerLocalizationRequestEvent) event);
       break;
     case CONTAINER_RESOURCES_LOCALIZED:
@@ -469,6 +469,13 @@ public class ResourceLocalizationService extends CompositeService
   private void handleInitContainerResources(
       ContainerLocalizationRequestEvent rsrcReqs) {
     Container c = rsrcReqs.getContainer();
+    EnumSet<ContainerState> set =
+        EnumSet.of(ContainerState.LOCALIZING, ContainerState.RUNNING);
+    if (!set.contains(c.getContainerState())) {
+      LOG.warn(c.getContainerId() + " is at " + c.getContainerState()
+          + " state, do not localize resources.");
+      return;
+    }
     // create a loading cache for the file statuses
     LoadingCache<Path,Future<FileStatus>> statCache =
         CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
@@ -538,7 +545,7 @@ public class ResourceLocalizationService extends CompositeService
     }
     String locId = c.getContainerId().toString();
     localizerTracker.cleanupPrivLocalizers(locId);
-    
+
     // Delete the container directories
     String userName = c.getUser();
     String containerIDStr = c.toString();
@@ -747,6 +754,14 @@ public class ResourceLocalizationService extends CompositeService
         case APPLICATION:
           synchronized (privLocalizers) {
             LocalizerRunner localizer = privLocalizers.get(locId);
+            if (localizer != null && localizer.killContainerLocalizer.get()) {
+              // Old localizer thread has been stopped, remove it and creates
+              // a new localizer thread.
+              LOG.info("New " + event.getType() + " localize request for "
+                  + locId + ", remove old private localizer.");
+              cleanupPrivLocalizers(locId);
+              localizer = null;
+            }
             if (null == localizer) {
               LOG.info("Created localizer for " + locId);
               localizer = new LocalizerRunner(req.getContext(), locId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
new file mode 100644
index 0000000..a41ee20
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All Resources requested by the container.
+ */
+public class ResourceSet {
+
+  private static final Log LOG = LogFactory.getLog(ResourceSet.class);
+
+  // resources by localization state (localized, pending, failed)
+  private Map<Path, List<String>> localizedResources =
+      new ConcurrentHashMap<>();
+  private Map<LocalResourceRequest, List<String>> pendingResources =
+      new ConcurrentHashMap<>();
+  private Set<LocalResourceRequest> resourcesFailedToBeLocalized =
+      new HashSet<>();
+
+  // resources by visibility (public, private, app)
+  private final List<LocalResourceRequest> publicRsrcs =
+      new ArrayList<>();
+  private final List<LocalResourceRequest> privateRsrcs =
+      new ArrayList<>();
+  private final List<LocalResourceRequest> appRsrcs =
+      new ArrayList<>();
+
+  private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
+      new ConcurrentHashMap<>();
+  private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
+      new ConcurrentHashMap<>();
+
+  public Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+      addResources(Map<String, LocalResource> localResourceMap)
+      throws URISyntaxException {
+    if (localResourceMap == null || localResourceMap.isEmpty()) {
+      return null;
+    }
+    Map<LocalResourceRequest, List<String>> allResources = new HashMap<>();
+    List<LocalResourceRequest> publicList = new ArrayList<>();
+    List<LocalResourceRequest> privateList = new ArrayList<>();
+    List<LocalResourceRequest> appList = new ArrayList<>();
+
+    for (Map.Entry<String, LocalResource> rsrc : localResourceMap.entrySet()) {
+      LocalResource resource = rsrc.getValue();
+      LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
+      allResources.putIfAbsent(req, new ArrayList<>());
+      allResources.get(req).add(rsrc.getKey());
+      storeSharedCacheUploadPolicy(req,
+          resource.getShouldBeUploadedToSharedCache());
+      switch (resource.getVisibility()) {
+      case PUBLIC:
+        publicList.add(req);
+        break;
+      case PRIVATE:
+        privateList.add(req);
+        break;
+      case APPLICATION:
+        appList.add(req);
+        break;
+      default:
+        break;
+      }
+    }
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+        new LinkedHashMap<>();
+    if (!publicList.isEmpty()) {
+      publicRsrcs.addAll(publicList);
+      req.put(LocalResourceVisibility.PUBLIC, publicList);
+    }
+    if (!privateList.isEmpty()) {
+      privateRsrcs.addAll(privateList);
+      req.put(LocalResourceVisibility.PRIVATE, privateList);
+    }
+    if (!appList.isEmpty()) {
+      appRsrcs.addAll(appList);
+      req.put(LocalResourceVisibility.APPLICATION, appList);
+    }
+    if (!allResources.isEmpty()) {
+      this.pendingResources.putAll(allResources);
+    }
+    return req;
+  }
+
+  /**
+   * Called when resource localized.
+   * @param request The original request for the localized resource
+   * @param location The path where the resource is localized
+   * @return The list of symlinks for the localized resources.
+   */
+  public List<String> resourceLocalized(LocalResourceRequest request,
+      Path location) {
+    List<String> symlinks = pendingResources.remove(request);
+    if (symlinks == null) {
+      return null;
+    } else {
+      localizedResources.put(location, symlinks);
+      return symlinks;
+    }
+  }
+
+  public void resourceLocalizationFailed(LocalResourceRequest request) {
+    pendingResources.remove(request);
+    resourcesFailedToBeLocalized.add(request);
+  }
+
+  public synchronized Map<LocalResourceVisibility,
+      Collection<LocalResourceRequest>> getAllResourcesByVisibility() {
+
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
+        new HashMap<>();
+    if (!publicRsrcs.isEmpty()) {
+      rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
+    }
+    if (!privateRsrcs.isEmpty()) {
+      rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
+    }
+    if (!appRsrcs.isEmpty()) {
+      rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
+    }
+    return rsrc;
+  }
+
+  /**
+   * Store the resource's shared cache upload policies
+   * Given LocalResourceRequest can be shared across containers in
+   * LocalResourcesTrackerImpl, we preserve the upload policies here.
+   * In addition, it is possible for the application to create several
+   * "identical" LocalResources as part of
+   * ContainerLaunchContext.setLocalResources with different symlinks.
+   * There is a corner case where these "identical" local resources have
+   * different upload policies. For that scenario, upload policy will be set to
+   * true as long as there is at least one LocalResource entry with
+   * upload policy set to true.
+   */
+  private void storeSharedCacheUploadPolicy(
+      LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
+    Boolean storedUploadPolicy = resourcesUploadPolicies.get(resourceRequest);
+    if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
+      resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
+    }
+  }
+
+  public Map<Path, List<String>> getLocalizedResources() {
+    return localizedResources;
+  }
+
+  public Map<LocalResourceRequest, Path> getResourcesToBeUploaded() {
+    return resourcesToBeUploaded;
+  }
+
+  public Map<LocalResourceRequest, Boolean> getResourcesUploadPolicies() {
+    return resourcesUploadPolicies;
+  }
+
+  public Map<LocalResourceRequest, List<String>> getPendingResources() {
+    return pendingResources;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
index 11bb25e..43a2f33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
@@ -44,7 +44,7 @@ public class ContainerLocalizationRequestEvent extends
    */
   public ContainerLocalizationRequestEvent(Container c,
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
-    super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
+    super(LocalizationEventType.LOCALIZE_CONTAINER_RESOURCES, c);
     this.rsrc = rsrc;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
index 4785fba..cb9842d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.eve
 
 public enum LocalizationEventType {
   INIT_APPLICATION_RESOURCES,
-  INIT_CONTAINER_RESOURCES,
+  LOCALIZE_CONTAINER_RESOURCES,
   CACHE_CLEANUP,
   CLEANUP_CONTAINER_RESOURCES,
   DESTROY_APPLICATION_RESOURCES,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 0371863..8e68889 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -326,7 +326,7 @@ public class NMTimelinePublisher extends CompositeService {
       publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
           ContainerMetricsConstants.LOCALIZATION_FINISHED_EVENT_TYPE);
       break;
-    case INIT_CONTAINER_RESOURCES:
+    case LOCALIZE_CONTAINER_RESOURCES:
       publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
           ContainerMetricsConstants.LOCALIZATION_START_EVENT_TYPE);
       break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index 4aeb2e2..740ed19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -86,7 +86,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
           dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
                 app.getAppId()));
           break;
-        case INIT_CONTAINER_RESOURCES:
+        case LOCALIZE_CONTAINER_RESOURCES:
           ContainerLocalizationRequestEvent rsrcReqs =
             (ContainerLocalizationRequestEvent) event;
           // simulate localization of all requested resources

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index 3e00885..aa0d975 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -131,7 +131,18 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
     LOG.info("Running testContainerLaunchAndExitFailure");
     super.testContainerLaunchAndExitFailure();
   }
-  
+
+  @Override
+  public void testLocalingResourceWhileContainerRunning()
+      throws Exception {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    super.testLocalingResourceWhileContainerRunning();
+  }
+
   @Override
   public void testLocalFilesCleanup() throws InterruptedException,
       IOException, YarnException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index d8660dd..031300f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
@@ -715,6 +716,11 @@ public abstract class BaseAMRMProxyTest {
 
     @Override
     public NMTimelinePublisher getNMTimelinePublisher() {
+      return  null;
+    }
+
+    @Override
+    public ContainerExecutor getContainerExecutor() {
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 1f803b4..5785e1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -36,17 +36,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Supplier;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -83,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -459,7 +463,138 @@ public class TestContainerManager extends BaseContainerManagerTest {
 	  // and verify exit code returned 
 	  testContainerLaunchAndExit(exitCode);	  
   }
-  
+
+  private Map<String, LocalResource> setupLocalResources(String fileName,
+      String symLink) throws Exception {
+    // ////// Create the resources for the container
+    File dir = new File(tmpDir, "dir");
+    dir.mkdirs();
+    File file = new File(dir, fileName);
+    PrintWriter fileWriter = new PrintWriter(file);
+    fileWriter.write("Hello World!");
+    fileWriter.close();
+
+    URL resourceURL = URL.fromPath(FileContext.getLocalFSFileContext()
+        .makeQualified(new Path(file.getAbsolutePath())));
+    LocalResource resource =
+        recordFactory.newRecordInstance(LocalResource.class);
+    resource.setResource(resourceURL);
+    resource.setSize(-1);
+    resource.setVisibility(LocalResourceVisibility.APPLICATION);
+    resource.setType(LocalResourceType.FILE);
+    resource.setTimestamp(file.lastModified());
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put(symLink, resource);
+    return localResources;
+  }
+
+  // Start the container
+  // While the container is running, localize new resources.
+  // Verify the symlink is created properly
+  @Test
+  public void testLocalingResourceWhileContainerRunning() throws Exception {
+    // Real del service
+    delSrvc = new DeletionService(exec);
+    delSrvc.init(conf);
+
+    ((NodeManager.NMContext)context).setContainerExecutor(exec);
+    containerManager = createContainerManager(delSrvc);
+    containerManager.init(conf);
+    containerManager.start();
+    // set up local resources
+    Map<String, LocalResource> localResource =
+        setupLocalResources("file", "symLink1");
+    ContainerLaunchContext context =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    context.setLocalResources(localResource);
+
+    // a long running container - sleep
+    context.setCommands(Arrays.asList("sleep 6"));
+    ContainerId cId = createContainerId(0);
+
+    // start the container
+    StartContainerRequest scRequest = StartContainerRequest.newInstance(context,
+        createContainerToken(cId, DUMMY_RM_IDENTIFIER, this.context.getNodeId(),
+            user, this.context.getContainerTokenSecretManager()));
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(Arrays.asList(scRequest));
+    containerManager.startContainers(allRequests);
+    BaseContainerManagerTest
+        .waitForContainerState(containerManager, cId, ContainerState.RUNNING);
+
+    BaseContainerManagerTest.waitForApplicationState(containerManager,
+        cId.getApplicationAttemptId().getApplicationId(),
+        ApplicationState.RUNNING);
+    checkResourceLocalized(cId, "symLink1");
+
+    // Localize new local resources while container is running
+    Map<String, LocalResource> localResource2 =
+        setupLocalResources("file2", "symLink2");
+
+    ResourceLocalizationRequest request =
+        ResourceLocalizationRequest.newInstance(cId, localResource2);
+    containerManager.localize(request);
+
+    // Verify resource is localized and symlink is created.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        try {
+          checkResourceLocalized(cId, "symLink2");
+          return true;
+        } catch (Throwable e) {
+          return false;
+        }
+      }
+    }, 500, 20000);
+
+    BaseContainerManagerTest
+        .waitForContainerState(containerManager, cId, ContainerState.COMPLETE);
+    // Verify container cannot localize resources while at non-running state.
+    try{
+      containerManager.localize(request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().contains("Not able to localize new resources"));
+    }
+  }
+
+  private void checkResourceLocalized(ContainerId containerId, String symLink) {
+    String appId =
+        containerId.getApplicationAttemptId().getApplicationId().toString();
+    File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE);
+    File userDir = new File(userCacheDir, user);
+    File appCache = new File(userDir, ContainerLocalizer.APPCACHE);
+    // localDir/usercache/nobody/appcache/application_0_0000
+    File appDir = new File(appCache, appId);
+    // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000
+    File containerDir = new File(appDir, containerId.toString());
+    // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000/symLink1
+    File targetFile = new File(containerDir, symLink);
+
+    File sysDir =
+        new File(localDir, ResourceLocalizationService.NM_PRIVATE_DIR);
+    // localDir/nmPrivate/application_0_0000
+    File appSysDir = new File(sysDir, appId);
+    // localDir/nmPrivate/application_0_0000/container_0_0000_01_000000
+    File containerSysDir = new File(appSysDir, containerId.toString());
+
+    Assert.assertTrue("AppDir " + appDir.getAbsolutePath() + " doesn't exist!!",
+        appDir.exists());
+    Assert.assertTrue(
+        "AppSysDir " + appSysDir.getAbsolutePath() + " doesn't exist!!",
+        appSysDir.exists());
+    Assert.assertTrue(
+        "containerDir " + containerDir.getAbsolutePath() + " doesn't exist !",
+        containerDir.exists());
+    Assert.assertTrue("containerSysDir " + containerSysDir.getAbsolutePath()
+        + " doesn't exist !", containerDir.exists());
+    Assert.assertTrue(
+        "targetFile " + targetFile.getAbsolutePath() + " doesn't exist !!",
+        targetFile.exists());
+  }
+
   @Test
   public void testLocalFilesCleanup() throws InterruptedException,
       IOException, YarnException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 23786fe..7aa657e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -66,6 +66,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
 import org.junit.Assert;
 
@@ -1767,7 +1768,7 @@ public class TestResourceLocalizationService {
       // creating new containers and populating corresponding localizer runners
 
       // Container - 1
-      ContainerImpl container1 = createMockContainer(user, 1);
+      Container container1 = createMockContainer(user, 1);
       String localizerId1 = container1.getContainerId().toString();
       rls.getPrivateLocalizers().put(
         localizerId1,
@@ -2292,7 +2293,7 @@ public class TestResourceLocalizationService {
   }
 
   private ContainerLocalizationRequestEvent createContainerLocalizationEvent(
-      ContainerImpl container, LocalResourceVisibility vis,
+      Container container, LocalResourceVisibility vis,
       LocalResourceRequest req) {
     Map<LocalResourceVisibility, Collection<LocalResourceRequest>> reqs =
         new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
@@ -2310,6 +2311,7 @@ public class TestResourceLocalizationService {
     when(container.getUser()).thenReturn(user);
     Credentials mockCredentials = mock(Credentials.class);
     when(container.getCredentials()).thenReturn(mockCredentials);
+    when(container.getContainerState()).thenReturn(ContainerState.LOCALIZING);
     return container;
   }
 
@@ -2358,6 +2360,7 @@ public class TestResourceLocalizationService {
     creds.addToken(new Text("tok" + id), tk);
     when(c.getCredentials()).thenReturn(creds);
     when(c.toString()).thenReturn(cId.toString());
+    when(c.getContainerState()).thenReturn(ContainerState.LOCALIZING);
     return c;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index 2df0c98..d24f89d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -89,6 +89,13 @@ public class TestContainersMonitorResourceChange {
     public void deleteAsUser(DeletionAsUserContext ctx)
         throws IOException, InterruptedException {
     }
+
+    @Override
+    public void symLink(String target, String symlink)
+        throws IOException {
+
+    }
+
     @Override
     public String getProcessId(ContainerId containerId) {
       return String.valueOf(containerId.getContainerId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 8332b2a..c176556 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,11 +19,6 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.webapp;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
@@ -41,8 +37,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class MockContainer implements Container {
 
   private ContainerId id;
@@ -119,6 +121,11 @@ public class MockContainer implements Container {
   }
 
   @Override
+  public ResourceSet getResourceSet() {
+    return null;
+  }
+
+  @Override
   public void handle(ContainerEvent event) {
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org