You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/09/13 08:29:21 UTC
[07/50] [abbrv] hadoop git commit: YARN-5576. Allow resource
localization while container is running. Contributed by Jian He.
YARN-5576. Allow resource localization while container is running. Contributed by Jian He.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6fcfe28
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6fcfe28
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6fcfe28
Branch: refs/heads/HDFS-10285
Commit: e6fcfe28e304062c7d09231db757acb2953703ce
Parents: 62a9667
Author: Varun Vasudev <vv...@apache.org>
Authored: Tue Sep 6 20:01:45 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Tue Sep 6 20:01:45 2016 +0530
----------------------------------------------------------------------
.../server/nodemanager/ContainerExecutor.java | 9 +
.../hadoop/yarn/server/nodemanager/Context.java | 2 +
.../nodemanager/DefaultContainerExecutor.java | 6 +
.../nodemanager/DockerContainerExecutor.java | 6 +
.../nodemanager/LinuxContainerExecutor.java | 5 +
.../yarn/server/nodemanager/NodeManager.java | 12 ++
.../containermanager/ContainerManagerImpl.java | 70 +++++--
.../containermanager/container/Container.java | 8 +-
.../container/ContainerImpl.java | 207 ++++++++-----------
.../launcher/ContainerLaunch.java | 4 +-
.../localizer/ResourceLocalizationService.java | 21 +-
.../containermanager/localizer/ResourceSet.java | 192 +++++++++++++++++
.../ContainerLocalizationRequestEvent.java | 2 +-
.../localizer/event/LocalizationEventType.java | 2 +-
.../timelineservice/NMTimelinePublisher.java | 2 +-
.../nodemanager/DummyContainerManager.java | 2 +-
.../TestContainerManagerWithLCE.java | 13 +-
.../amrmproxy/BaseAMRMProxyTest.java | 6 +
.../containermanager/TestContainerManager.java | 137 +++++++++++-
.../TestResourceLocalizationService.java | 7 +-
.../TestContainersMonitorResourceChange.java | 7 +
.../nodemanager/webapp/MockContainer.java | 17 +-
22 files changed, 577 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 818b0ea..918c30a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -182,6 +182,15 @@ public abstract class ContainerExecutor implements Configurable {
throws IOException, InterruptedException;
/**
+ * Create a symlink file which points to the target.
+ * @param target The target for symlink
+ * @param symlink the symlink file
+ * @throws IOException Error when creating symlinks
+ */
+ public abstract void symLink(String target, String symlink)
+ throws IOException;
+
+ /**
* Check if a container is alive.
* @param ctx Encapsulates information necessary for container liveness check.
* @return true if container is still alive
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 88bc29c..e888393 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -126,4 +126,6 @@ public interface Context {
void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher);
NMTimelinePublisher getNMTimelinePublisher();
+
+ ContainerExecutor getContainerExecutor();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index 13ad9ac..9a0549d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -511,6 +512,11 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
}
+ @Override
+ public void symLink(String target, String symlink) throws IOException {
+ FileUtil.symLink(target, symlink);
+ }
+
/** Permissions for user dir.
* $local.dir/usercache/$user */
static final short USER_PERM = (short)0750;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
index ebf9566..1390214 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
@@ -490,6 +490,12 @@ public class DockerContainerExecutor extends ContainerExecutor {
}
}
+ @Override
+ public void symLink(String target, String symlink)
+ throws IOException {
+
+ }
+
/**
* Converts a directory list to a docker mount string
* @param dirs
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index 6890b25..cc12b20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -683,6 +683,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
@Override
+ public void symLink(String target, String symlink) {
+
+ }
+
+ @Override
public boolean isContainerAlive(ContainerLivenessContext ctx)
throws IOException {
String user = ctx.getUser();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 280a086..7f13334 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -343,6 +343,9 @@ public class NodeManager extends CompositeService
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf);
+
+ ((NMContext)context).setContainerExecutor(exec);
+
nodeLabelsProvider = createNodeLabelsProvider(conf);
if (null == nodeLabelsProvider) {
@@ -509,6 +512,7 @@ public class NodeManager extends CompositeService
private OpportunisticContainerAllocator containerAllocator;
private final QueuingContext queuingContext;
+ private ContainerExecutor executor;
private NMTimelinePublisher nmTimelinePublisher;
@@ -702,6 +706,14 @@ public class NodeManager extends CompositeService
public NMTimelinePublisher getNMTimelinePublisher() {
return nmTimelinePublisher;
}
+
+ public ContainerExecutor getContainerExecutor() {
+ return this.executor;
+ }
+
+ public void setContainerExecutor(ContainerExecutor executor) {
+ this.executor = executor;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 8c060bc..52d8566 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -18,25 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
-import static org.apache.hadoop.service.Service.STATE.STARTED;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -76,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -129,7 +113,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
@@ -154,8 +140,25 @@ import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import static org.apache.hadoop.service.Service.STATE.STARTED;
public class ContainerManagerImpl extends CompositeService implements
ContainerManager {
@@ -1525,6 +1528,31 @@ public class ContainerManagerImpl extends CompositeService implements
public ResourceLocalizationResponse localize(
ResourceLocalizationRequest request) throws YarnException, IOException {
+ ContainerId containerId = request.getContainerId();
+ Container container = context.getContainers().get(containerId);
+ if (container == null) {
+ throw new YarnException("Specified " + containerId + " does not exist!");
+ }
+ if (!container.getContainerState()
+ .equals(org.apache.hadoop.yarn.server.nodemanager.
+ containermanager.container.ContainerState.RUNNING)) {
+ throw new YarnException(
+ containerId + " is at " + container.getContainerState()
+ + " state. Not able to localize new resources.");
+ }
+
+ try {
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+ container.getResourceSet().addResources(request.getLocalResources());
+ if (req != null && !req.isEmpty()) {
+ dispatcher.getEventHandler()
+ .handle(new ContainerLocalizationRequestEvent(container, req));
+ }
+ } catch (URISyntaxException e) {
+ LOG.info("Error when parsing local resource URI for " + containerId, e);
+ throw new YarnException(e);
+ }
+
return ResourceLocalizationResponse.newInstance();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 84d3cb2..c4cea18 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
-import java.util.List;
-import java.util.Map;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -31,6 +28,10 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
+
+import java.util.List;
+import java.util.Map;
public interface Container extends EventHandler<ContainerEvent> {
@@ -74,4 +75,5 @@ public interface Container extends EventHandler<ContainerEvent> {
Priority getPriority();
+ ResourceSet getResourceSet();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 0244d90..ce9e581 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -18,18 +18,15 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -65,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
@@ -124,20 +122,7 @@ public class ContainerImpl implements Container {
private final Configuration daemonConf;
private static final Log LOG = LogFactory.getLog(ContainerImpl.class);
- private final Map<LocalResourceRequest,List<String>> pendingResources =
- new HashMap<LocalResourceRequest,List<String>>();
- private final Map<Path,List<String>> localizedResources =
- new HashMap<Path,List<String>>();
- private final List<LocalResourceRequest> publicRsrcs =
- new ArrayList<LocalResourceRequest>();
- private final List<LocalResourceRequest> privateRsrcs =
- new ArrayList<LocalResourceRequest>();
- private final List<LocalResourceRequest> appRsrcs =
- new ArrayList<LocalResourceRequest>();
- private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
- new ConcurrentHashMap<LocalResourceRequest, Path>();
- private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
- new ConcurrentHashMap<LocalResourceRequest, Boolean>();
+
// whether container has been recovered after a restart
private RecoveredContainerStatus recoveredStatus =
@@ -145,6 +130,7 @@ public class ContainerImpl implements Container {
// whether container was marked as killed after recovery
private boolean recoveredAsKilled = false;
private Context context;
+ private ResourceSet resourceSet;
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@@ -204,6 +190,7 @@ public class ContainerImpl implements Container {
stateMachine = stateMachineFactory.make(this);
this.context = context;
+ this.resourceSet = new ResourceSet();
}
// constructor for a recovered container
@@ -313,6 +300,12 @@ public class ContainerImpl implements Container {
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ new ResourceLocalizedWhileRunningTransition())
+ .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+ ContainerEventType.RESOURCE_FAILED,
+ new ResourceLocalizationFailedWhileRunningTransition())
+ .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RUNNING, ContainerState.KILLING,
@@ -470,7 +463,7 @@ public class ContainerImpl implements Container {
try {
if (ContainerState.LOCALIZED == getContainerState()
|| ContainerState.RELAUNCHING == getContainerState()) {
- return localizedResources;
+ return resourceSet.getLocalizedResources();
} else {
return null;
}
@@ -591,6 +584,11 @@ public class ContainerImpl implements Container {
this.logDir = logDir;
}
+ @Override
+ public ResourceSet getResourceSet() {
+ return this.resourceSet;
+ }
+
@SuppressWarnings("unchecked")
private void sendFinishedEvents() {
// Inform the application
@@ -653,7 +651,7 @@ public class ContainerImpl implements Container {
for (String s : diags) {
this.diagnostics.append(s);
}
- if (isRetryContextSet() && diagnostics.length() > diagnosticsMaxSize) {
+ if (diagnostics.length() > diagnosticsMaxSize) {
diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
}
try {
@@ -667,17 +665,7 @@ public class ContainerImpl implements Container {
@SuppressWarnings("unchecked") // dispatcher not typed
public void cleanup() {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
- new HashMap<LocalResourceVisibility,
- Collection<LocalResourceRequest>>();
- if (!publicRsrcs.isEmpty()) {
- rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
- }
- if (!privateRsrcs.isEmpty()) {
- rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
- }
- if (!appRsrcs.isEmpty()) {
- rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
- }
+ resourceSet.getAllResourcesByVisibility();
dispatcher.getEventHandler().handle(
new ContainerLocalizationCleanupEvent(this, rsrc));
}
@@ -697,7 +685,7 @@ public class ContainerImpl implements Container {
* message.
*
* If there are resources to localize, sends a
- * ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES)
+ * ContainerLocalizationRequest (LOCALIZE_CONTAINER_RESOURCES)
* to the ResourceLocalizationManager and enters LOCALIZING state.
*
* If there are no resources to localize, sends LAUNCH_CONTAINER event
@@ -749,39 +737,15 @@ public class ContainerImpl implements Container {
}
container.containerLocalizationStartTime = clock.getTime();
+
// Send requests for public, private resources
Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
if (!cntrRsrc.isEmpty()) {
try {
- for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
- try {
- LocalResourceRequest req =
- new LocalResourceRequest(rsrc.getValue());
- List<String> links = container.pendingResources.get(req);
- if (links == null) {
- links = new ArrayList<String>();
- container.pendingResources.put(req, links);
- }
- links.add(rsrc.getKey());
- storeSharedCacheUploadPolicy(container, req, rsrc.getValue()
- .getShouldBeUploadedToSharedCache());
- switch (rsrc.getValue().getVisibility()) {
- case PUBLIC:
- container.publicRsrcs.add(req);
- break;
- case PRIVATE:
- container.privateRsrcs.add(req);
- break;
- case APPLICATION:
- container.appRsrcs.add(req);
- break;
- }
- } catch (URISyntaxException e) {
- LOG.info("Got exception parsing " + rsrc.getKey()
- + " and value " + rsrc.getValue());
- throw e;
- }
- }
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+ container.resourceSet.addResources(ctxt.getLocalResources());
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizationRequestEvent(container, req));
} catch (URISyntaxException e) {
// malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e);
@@ -789,21 +753,6 @@ public class ContainerImpl implements Container {
container.metrics.endInitingContainer();
return ContainerState.LOCALIZATION_FAILED;
}
- Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
- new LinkedHashMap<LocalResourceVisibility,
- Collection<LocalResourceRequest>>();
- if (!container.publicRsrcs.isEmpty()) {
- req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
- }
- if (!container.privateRsrcs.isEmpty()) {
- req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
- }
- if (!container.appRsrcs.isEmpty()) {
- req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
- }
-
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
container.sendLaunchEvent();
@@ -814,27 +763,6 @@ public class ContainerImpl implements Container {
}
/**
- * Store the resource's shared cache upload policies
- * Given LocalResourceRequest can be shared across containers in
- * LocalResourcesTrackerImpl, we preserve the upload policies here.
- * In addition, it is possible for the application to create several
- * "identical" LocalResources as part of
- * ContainerLaunchContext.setLocalResources with different symlinks.
- * There is a corner case where these "identical" local resources have
- * different upload policies. For that scenario, upload policy will be set to
- * true as long as there is at least one LocalResource entry with
- * upload policy set to true.
- */
- private static void storeSharedCacheUploadPolicy(ContainerImpl container,
- LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
- Boolean storedUploadPolicy =
- container.resourcesUploadPolicies.get(resourceRequest);
- if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
- container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
- }
- }
-
- /**
* Transition when one of the requested resources for this container
* has been successfully localized.
*/
@@ -847,22 +775,21 @@ public class ContainerImpl implements Container {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
LocalResourceRequest resourceRequest = rsrcEvent.getResource();
Path location = rsrcEvent.getLocation();
- List<String> syms = container.pendingResources.remove(resourceRequest);
+ List<String> syms =
+ container.resourceSet.resourceLocalized(resourceRequest, location);
if (null == syms) {
- LOG.warn("Localized unknown resource " + resourceRequest +
- " for container " + container.containerId);
- assert false;
- // fail container?
+ LOG.info("Localized resource " + resourceRequest +
+ " for container " + container.containerId);
return ContainerState.LOCALIZING;
}
- container.localizedResources.put(location, syms);
// check to see if this resource should be uploaded to the shared cache
// as well
if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
- container.resourcesToBeUploaded.put(resourceRequest, location);
+ container.resourceSet.getResourcesToBeUploaded()
+ .put(resourceRequest, location);
}
- if (!container.pendingResources.isEmpty()) {
+ if (!container.resourceSet.getPendingResources().isEmpty()) {
return ContainerState.LOCALIZING;
}
@@ -884,7 +811,8 @@ public class ContainerImpl implements Container {
&& container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
// kick off uploads to the shared cache
container.dispatcher.getEventHandler().handle(
- new SharedCacheUploadEvent(container.resourcesToBeUploaded, container
+ new SharedCacheUploadEvent(
+ container.resourceSet.getResourcesToBeUploaded(), container
.getLaunchContext(), container.getUser(),
SharedCacheUploadEventType.UPLOAD));
}
@@ -894,6 +822,56 @@ public class ContainerImpl implements Container {
}
/**
+ * Resource is localized while the container is running - create symlinks
+ */
+ static class ResourceLocalizedWhileRunningTransition
+ extends ContainerTransition {
+
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ ContainerResourceLocalizedEvent rsrcEvent =
+ (ContainerResourceLocalizedEvent) event;
+ List<String> links = container.resourceSet
+ .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
+ // creating symlinks.
+ for (String link : links) {
+ try {
+ String linkFile = new Path(container.workDir, link).toString();
+ if (new File(linkFile).exists()) {
+ LOG.info("Symlink file already exists: " + linkFile);
+ } else {
+ container.context.getContainerExecutor()
+ .symLink(rsrcEvent.getLocation().toString(), linkFile);
+ LOG.info("Created symlink: " + linkFile + " -> " + rsrcEvent
+ .getLocation());
+ }
+ } catch (IOException e) {
+ String message = String
+ .format("Error when creating symlink %s -> %s", link,
+ rsrcEvent.getLocation());
+ LOG.error(message, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Resource localization failed while the container is running.
+ */
+ static class ResourceLocalizationFailedWhileRunningTransition
+ extends ContainerTransition {
+
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ ContainerResourceFailedEvent failedEvent =
+ (ContainerResourceFailedEvent) event;
+ container.resourceSet
+ .resourceLocalizationFailed(failedEvent.getResource());
+ container.addDiagnostics(failedEvent.getDiagnosticMessage());
+ }
+ }
+
+ /**
* Transition from LOCALIZED state to RUNNING state upon receiving
* a CONTAINER_LAUNCHED event
*/
@@ -1136,17 +1114,10 @@ public class ContainerImpl implements Container {
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
- ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
- List<String> syms =
- container.pendingResources.remove(rsrcEvent.getResource());
- if (null == syms) {
- LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
- " for container " + container.containerId);
- assert false;
- // fail container?
- return;
- }
- container.localizedResources.put(rsrcEvent.getLocation(), syms);
+ ContainerResourceLocalizedEvent rsrcEvent =
+ (ContainerResourceLocalizedEvent) event;
+ container.resourceSet
+ .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
}
}
@@ -1402,7 +1373,7 @@ public class ContainerImpl implements Container {
*/
private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
LocalResourceRequest resource) {
- return container.resourcesUploadPolicies.get(resource);
+ return container.resourceSet.getResourcesUploadPolicies().get(resource);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 14190fc..d8239ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -1201,16 +1201,16 @@ public class ContainerLaunch implements Callable<Integer> {
private void recordContainerLogDir(ContainerId containerId,
String logDir) throws IOException{
+ container.setLogDir(logDir);
if (container.isRetryContextSet()) {
- container.setLogDir(logDir);
context.getNMStateStore().storeContainerLogDir(containerId, logDir);
}
}
private void recordContainerWorkDir(ContainerId containerId,
String workDir) throws IOException{
+ container.setWorkDir(workDir);
if (container.isRetryContextSet()) {
- container.setWorkDir(workDir);
context.getNMStateStore().storeContainerWorkDir(containerId, workDir);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 0ec2f29..b281ef5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
@@ -135,7 +136,6 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.annotations.VisibleForTesting;
@@ -419,7 +419,7 @@ public class ResourceLocalizationService extends CompositeService
handleInitApplicationResources(
((ApplicationLocalizationEvent)event).getApplication());
break;
- case INIT_CONTAINER_RESOURCES:
+ case LOCALIZE_CONTAINER_RESOURCES:
handleInitContainerResources((ContainerLocalizationRequestEvent) event);
break;
case CONTAINER_RESOURCES_LOCALIZED:
@@ -469,6 +469,13 @@ public class ResourceLocalizationService extends CompositeService
private void handleInitContainerResources(
ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer();
+ EnumSet<ContainerState> set =
+ EnumSet.of(ContainerState.LOCALIZING, ContainerState.RUNNING);
+ if (!set.contains(c.getContainerState())) {
+ LOG.warn(c.getContainerId() + " is at " + c.getContainerState()
+ + " state, do not localize resources.");
+ return;
+ }
// create a loading cache for the file statuses
LoadingCache<Path,Future<FileStatus>> statCache =
CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
@@ -538,7 +545,7 @@ public class ResourceLocalizationService extends CompositeService
}
String locId = c.getContainerId().toString();
localizerTracker.cleanupPrivLocalizers(locId);
-
+
// Delete the container directories
String userName = c.getUser();
String containerIDStr = c.toString();
@@ -747,6 +754,14 @@ public class ResourceLocalizationService extends CompositeService
case APPLICATION:
synchronized (privLocalizers) {
LocalizerRunner localizer = privLocalizers.get(locId);
+ if (localizer != null && localizer.killContainerLocalizer.get()) {
+ // Old localizer thread has been stopped, remove it and creates
+ // a new localizer thread.
+ LOG.info("New " + event.getType() + " localize request for "
+ + locId + ", remove old private localizer.");
+ cleanupPrivLocalizers(locId);
+ localizer = null;
+ }
if (null == localizer) {
LOG.info("Created localizer for " + locId);
localizer = new LocalizerRunner(req.getContext(), locId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
new file mode 100644
index 0000000..a41ee20
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All Resources requested by the container.
+ */
+public class ResourceSet {
+
+ private static final Log LOG = LogFactory.getLog(ResourceSet.class);
+
+ // resources by localization state (localized, pending, failed)
+ private Map<Path, List<String>> localizedResources =
+ new ConcurrentHashMap<>();
+ private Map<LocalResourceRequest, List<String>> pendingResources =
+ new ConcurrentHashMap<>();
+ private Set<LocalResourceRequest> resourcesFailedToBeLocalized =
+ new HashSet<>();
+
+ // resources by visibility (public, private, app)
+ private final List<LocalResourceRequest> publicRsrcs =
+ new ArrayList<>();
+ private final List<LocalResourceRequest> privateRsrcs =
+ new ArrayList<>();
+ private final List<LocalResourceRequest> appRsrcs =
+ new ArrayList<>();
+
+ private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
+ new ConcurrentHashMap<>();
+ private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
+ new ConcurrentHashMap<>();
+
+ public Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+ addResources(Map<String, LocalResource> localResourceMap)
+ throws URISyntaxException {
+ if (localResourceMap == null || localResourceMap.isEmpty()) {
+ return null;
+ }
+ Map<LocalResourceRequest, List<String>> allResources = new HashMap<>();
+ List<LocalResourceRequest> publicList = new ArrayList<>();
+ List<LocalResourceRequest> privateList = new ArrayList<>();
+ List<LocalResourceRequest> appList = new ArrayList<>();
+
+ for (Map.Entry<String, LocalResource> rsrc : localResourceMap.entrySet()) {
+ LocalResource resource = rsrc.getValue();
+ LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
+ allResources.putIfAbsent(req, new ArrayList<>());
+ allResources.get(req).add(rsrc.getKey());
+ storeSharedCacheUploadPolicy(req,
+ resource.getShouldBeUploadedToSharedCache());
+ switch (resource.getVisibility()) {
+ case PUBLIC:
+ publicList.add(req);
+ break;
+ case PRIVATE:
+ privateList.add(req);
+ break;
+ case APPLICATION:
+ appList.add(req);
+ break;
+ default:
+ break;
+ }
+ }
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+ new LinkedHashMap<>();
+ if (!publicList.isEmpty()) {
+ publicRsrcs.addAll(publicList);
+ req.put(LocalResourceVisibility.PUBLIC, publicList);
+ }
+ if (!privateList.isEmpty()) {
+ privateRsrcs.addAll(privateList);
+ req.put(LocalResourceVisibility.PRIVATE, privateList);
+ }
+ if (!appList.isEmpty()) {
+ appRsrcs.addAll(appList);
+ req.put(LocalResourceVisibility.APPLICATION, appList);
+ }
+ if (!allResources.isEmpty()) {
+ this.pendingResources.putAll(allResources);
+ }
+ return req;
+ }
+
+ /**
+ * Called when resource localized.
+ * @param request The original request for the localized resource
+ * @param location The path where the resource is localized
+ * @return The list of symlinks for the localized resources.
+ */
+ public List<String> resourceLocalized(LocalResourceRequest request,
+ Path location) {
+ List<String> symlinks = pendingResources.remove(request);
+ if (symlinks == null) {
+ return null;
+ } else {
+ localizedResources.put(location, symlinks);
+ return symlinks;
+ }
+ }
+
+ public void resourceLocalizationFailed(LocalResourceRequest request) {
+ pendingResources.remove(request);
+ resourcesFailedToBeLocalized.add(request);
+ }
+
+ public synchronized Map<LocalResourceVisibility,
+ Collection<LocalResourceRequest>> getAllResourcesByVisibility() {
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
+ new HashMap<>();
+ if (!publicRsrcs.isEmpty()) {
+ rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
+ }
+ if (!privateRsrcs.isEmpty()) {
+ rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
+ }
+ if (!appRsrcs.isEmpty()) {
+ rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
+ }
+ return rsrc;
+ }
+
+ /**
+ * Store the resource's shared cache upload policies
+ * Given LocalResourceRequest can be shared across containers in
+ * LocalResourcesTrackerImpl, we preserve the upload policies here.
+ * In addition, it is possible for the application to create several
+ * "identical" LocalResources as part of
+ * ContainerLaunchContext.setLocalResources with different symlinks.
+ * There is a corner case where these "identical" local resources have
+ * different upload policies. For that scenario, upload policy will be set to
+ * true as long as there is at least one LocalResource entry with
+ * upload policy set to true.
+ */
+ private void storeSharedCacheUploadPolicy(
+ LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
+ Boolean storedUploadPolicy = resourcesUploadPolicies.get(resourceRequest);
+ if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
+ resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
+ }
+ }
+
+ public Map<Path, List<String>> getLocalizedResources() {
+ return localizedResources;
+ }
+
+ public Map<LocalResourceRequest, Path> getResourcesToBeUploaded() {
+ return resourcesToBeUploaded;
+ }
+
+ public Map<LocalResourceRequest, Boolean> getResourcesUploadPolicies() {
+ return resourcesUploadPolicies;
+ }
+
+ public Map<LocalResourceRequest, List<String>> getPendingResources() {
+ return pendingResources;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
index 11bb25e..43a2f33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
@@ -44,7 +44,7 @@ public class ContainerLocalizationRequestEvent extends
*/
public ContainerLocalizationRequestEvent(Container c,
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
- super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
+ super(LocalizationEventType.LOCALIZE_CONTAINER_RESOURCES, c);
this.rsrc = rsrc;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
index 4785fba..cb9842d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.eve
public enum LocalizationEventType {
INIT_APPLICATION_RESOURCES,
- INIT_CONTAINER_RESOURCES,
+ LOCALIZE_CONTAINER_RESOURCES,
CACHE_CLEANUP,
CLEANUP_CONTAINER_RESOURCES,
DESTROY_APPLICATION_RESOURCES,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 0371863..8e68889 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -326,7 +326,7 @@ public class NMTimelinePublisher extends CompositeService {
publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
ContainerMetricsConstants.LOCALIZATION_FINISHED_EVENT_TYPE);
break;
- case INIT_CONTAINER_RESOURCES:
+ case LOCALIZE_CONTAINER_RESOURCES:
publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
ContainerMetricsConstants.LOCALIZATION_START_EVENT_TYPE);
break;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index 4aeb2e2..740ed19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -86,7 +86,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
app.getAppId()));
break;
- case INIT_CONTAINER_RESOURCES:
+ case LOCALIZE_CONTAINER_RESOURCES:
ContainerLocalizationRequestEvent rsrcReqs =
(ContainerLocalizationRequestEvent) event;
// simulate localization of all requested resources
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index 3e00885..aa0d975 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -131,7 +131,18 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
LOG.info("Running testContainerLaunchAndExitFailure");
super.testContainerLaunchAndExitFailure();
}
-
+
+ @Override
+ public void testLocalingResourceWhileContainerRunning()
+ throws Exception {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ super.testLocalingResourceWhileContainerRunning();
+ }
+
@Override
public void testLocalFilesCleanup() throws InterruptedException,
IOException, YarnException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index d8660dd..031300f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
@@ -715,6 +716,11 @@ public abstract class BaseAMRMProxyTest {
@Override
public NMTimelinePublisher getNMTimelinePublisher() {
+ return null;
+ }
+
+ @Override
+ public ContainerExecutor getContainerExecutor() {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 1f803b4..5785e1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -36,17 +36,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Supplier;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -83,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -459,7 +463,138 @@ public class TestContainerManager extends BaseContainerManagerTest {
// and verify exit code returned
testContainerLaunchAndExit(exitCode);
}
-
+
+ private Map<String, LocalResource> setupLocalResources(String fileName,
+ String symLink) throws Exception {
+ // ////// Create the resources for the container
+ File dir = new File(tmpDir, "dir");
+ dir.mkdirs();
+ File file = new File(dir, fileName);
+ PrintWriter fileWriter = new PrintWriter(file);
+ fileWriter.write("Hello World!");
+ fileWriter.close();
+
+ URL resourceURL = URL.fromPath(FileContext.getLocalFSFileContext()
+ .makeQualified(new Path(file.getAbsolutePath())));
+ LocalResource resource =
+ recordFactory.newRecordInstance(LocalResource.class);
+ resource.setResource(resourceURL);
+ resource.setSize(-1);
+ resource.setVisibility(LocalResourceVisibility.APPLICATION);
+ resource.setType(LocalResourceType.FILE);
+ resource.setTimestamp(file.lastModified());
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ localResources.put(symLink, resource);
+ return localResources;
+ }
+
+ // Start the container
+ // While the container is running, localize new resources.
+ // Verify the symlink is created properly
+ @Test
+ public void testLocalingResourceWhileContainerRunning() throws Exception {
+ // Real del service
+ delSrvc = new DeletionService(exec);
+ delSrvc.init(conf);
+
+ ((NodeManager.NMContext)context).setContainerExecutor(exec);
+ containerManager = createContainerManager(delSrvc);
+ containerManager.init(conf);
+ containerManager.start();
+ // set up local resources
+ Map<String, LocalResource> localResource =
+ setupLocalResources("file", "symLink1");
+ ContainerLaunchContext context =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ context.setLocalResources(localResource);
+
+ // a long running container - sleep
+ context.setCommands(Arrays.asList("sleep 6"));
+ ContainerId cId = createContainerId(0);
+
+ // start the container
+ StartContainerRequest scRequest = StartContainerRequest.newInstance(context,
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, this.context.getNodeId(),
+ user, this.context.getContainerTokenSecretManager()));
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(Arrays.asList(scRequest));
+ containerManager.startContainers(allRequests);
+ BaseContainerManagerTest
+ .waitForContainerState(containerManager, cId, ContainerState.RUNNING);
+
+ BaseContainerManagerTest.waitForApplicationState(containerManager,
+ cId.getApplicationAttemptId().getApplicationId(),
+ ApplicationState.RUNNING);
+ checkResourceLocalized(cId, "symLink1");
+
+ // Localize new local resources while container is running
+ Map<String, LocalResource> localResource2 =
+ setupLocalResources("file2", "symLink2");
+
+ ResourceLocalizationRequest request =
+ ResourceLocalizationRequest.newInstance(cId, localResource2);
+ containerManager.localize(request);
+
+ // Verify resource is localized and symlink is created.
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ public Boolean get() {
+ try {
+ checkResourceLocalized(cId, "symLink2");
+ return true;
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+ }, 500, 20000);
+
+ BaseContainerManagerTest
+ .waitForContainerState(containerManager, cId, ContainerState.COMPLETE);
+ // Verify container cannot localize resources while at non-running state.
+ try{
+ containerManager.localize(request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(
+ e.getMessage().contains("Not able to localize new resources"));
+ }
+ }
+
+ private void checkResourceLocalized(ContainerId containerId, String symLink) {
+ String appId =
+ containerId.getApplicationAttemptId().getApplicationId().toString();
+ File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE);
+ File userDir = new File(userCacheDir, user);
+ File appCache = new File(userDir, ContainerLocalizer.APPCACHE);
+ // localDir/usercache/nobody/appcache/application_0_0000
+ File appDir = new File(appCache, appId);
+ // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000
+ File containerDir = new File(appDir, containerId.toString());
+ // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000/symLink1
+ File targetFile = new File(containerDir, symLink);
+
+ File sysDir =
+ new File(localDir, ResourceLocalizationService.NM_PRIVATE_DIR);
+ // localDir/nmPrivate/application_0_0000
+ File appSysDir = new File(sysDir, appId);
+ // localDir/nmPrivate/application_0_0000/container_0_0000_01_000000
+ File containerSysDir = new File(appSysDir, containerId.toString());
+
+ Assert.assertTrue("AppDir " + appDir.getAbsolutePath() + " doesn't exist!!",
+ appDir.exists());
+ Assert.assertTrue(
+ "AppSysDir " + appSysDir.getAbsolutePath() + " doesn't exist!!",
+ appSysDir.exists());
+ Assert.assertTrue(
+ "containerDir " + containerDir.getAbsolutePath() + " doesn't exist !",
+ containerDir.exists());
+ Assert.assertTrue("containerSysDir " + containerSysDir.getAbsolutePath()
+ + " doesn't exist !", containerDir.exists());
+ Assert.assertTrue(
+ "targetFile " + targetFile.getAbsolutePath() + " doesn't exist !!",
+ targetFile.exists());
+ }
+
@Test
public void testLocalFilesCleanup() throws InterruptedException,
IOException, YarnException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 23786fe..7aa657e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -66,6 +66,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.Assert;
@@ -1767,7 +1768,7 @@ public class TestResourceLocalizationService {
// creating new containers and populating corresponding localizer runners
// Container - 1
- ContainerImpl container1 = createMockContainer(user, 1);
+ Container container1 = createMockContainer(user, 1);
String localizerId1 = container1.getContainerId().toString();
rls.getPrivateLocalizers().put(
localizerId1,
@@ -2292,7 +2293,7 @@ public class TestResourceLocalizationService {
}
private ContainerLocalizationRequestEvent createContainerLocalizationEvent(
- ContainerImpl container, LocalResourceVisibility vis,
+ Container container, LocalResourceVisibility vis,
LocalResourceRequest req) {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> reqs =
new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
@@ -2310,6 +2311,7 @@ public class TestResourceLocalizationService {
when(container.getUser()).thenReturn(user);
Credentials mockCredentials = mock(Credentials.class);
when(container.getCredentials()).thenReturn(mockCredentials);
+ when(container.getContainerState()).thenReturn(ContainerState.LOCALIZING);
return container;
}
@@ -2358,6 +2360,7 @@ public class TestResourceLocalizationService {
creds.addToken(new Text("tok" + id), tk);
when(c.getCredentials()).thenReturn(creds);
when(c.toString()).thenReturn(cId.toString());
+ when(c.getContainerState()).thenReturn(ContainerState.LOCALIZING);
return c;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index 2df0c98..d24f89d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -89,6 +89,13 @@ public class TestContainersMonitorResourceChange {
public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException {
}
+
+ @Override
+ public void symLink(String target, String symlink)
+ throws IOException {
+
+ }
+
@Override
public String getProcessId(ContainerId containerId) {
return String.valueOf(containerId.getContainerId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6fcfe28/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 8332b2a..c176556 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,11 +19,6 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
@@ -41,8 +37,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class MockContainer implements Container {
private ContainerId id;
@@ -119,6 +121,11 @@ public class MockContainer implements Container {
}
@Override
+ public ResourceSet getResourceSet() {
+ return null;
+ }
+
+ @Override
public void handle(ContainerEvent event) {
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org