You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by te...@apache.org on 2017/05/10 17:46:56 UTC
hadoop git commit: YARN-6475. Fix some long function checkstyle
issues (Contributed by Soumabrata Chakraborty via Daniel Templeton)
Repository: hadoop
Updated Branches:
refs/heads/trunk 1e71fe8c4 -> 74a61438c
YARN-6475. Fix some long function checkstyle issues
(Contributed by Soumabrata Chakraborty via Daniel Templeton)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74a61438
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74a61438
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74a61438
Branch: refs/heads/trunk
Commit: 74a61438ca01e2191b54000af73b654a2d0b8253
Parents: 1e71fe8
Author: Daniel Templeton <te...@apache.org>
Authored: Wed May 10 10:45:02 2017 -0700
Committer: Daniel Templeton <te...@apache.org>
Committed: Wed May 10 10:46:50 2017 -0700
----------------------------------------------------------------------
.../nodemanager/LinuxContainerExecutor.java | 92 ++---
.../nodemanager/NodeStatusUpdaterImpl.java | 390 ++++++++++---------
.../launcher/ContainerLaunch.java | 238 ++++++-----
.../runtime/DockerLinuxContainerRuntime.java | 85 ++--
4 files changed, 429 insertions(+), 376 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a61438/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index cb1d53d..9a3b2d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -62,7 +62,6 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.regex.Pattern;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.*;
@@ -442,24 +441,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
public int launchContainer(ContainerStartContext ctx)
throws IOException, ConfigurationException {
Container container = ctx.getContainer();
- Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
- Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
String user = ctx.getUser();
- String appId = ctx.getAppId();
- Path containerWorkDir = ctx.getContainerWorkDir();
- List<String> localDirs = ctx.getLocalDirs();
- List<String> logDirs = ctx.getLogDirs();
- List<String> filecacheDirs = ctx.getFilecacheDirs();
- List<String> userLocalDirs = ctx.getUserLocalDirs();
- List<String> containerLocalDirs = ctx.getContainerLocalDirs();
- List<String> containerLogDirs = ctx.getContainerLogDirs();
- Map<Path, List<String>> localizedResources = ctx.getLocalizedResources();
verifyUsernamePattern(user);
- String runAsUser = getRunAsUser(user);
ContainerId containerId = container.getContainerId();
- String containerIdStr = containerId.toString();
resourcesHandler.preExecute(containerId,
container.getResource());
@@ -514,39 +500,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
try {
Path pidFilePath = getPidFilePath(containerId);
if (pidFilePath != null) {
- List<String> prefixCommands = new ArrayList<>();
- ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext
- .Builder(container);
- addSchedPriorityCommand(prefixCommands);
- if (prefixCommands.size() > 0) {
- builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS,
- prefixCommands);
- }
+ ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(
+ ctx, pidFilePath, resourcesOptions, tcCommandFile);
- builder.setExecutionAttribute(LOCALIZED_RESOURCES, localizedResources)
- .setExecutionAttribute(RUN_AS_USER, runAsUser)
- .setExecutionAttribute(USER, user)
- .setExecutionAttribute(APPID, appId)
- .setExecutionAttribute(CONTAINER_ID_STR, containerIdStr)
- .setExecutionAttribute(CONTAINER_WORK_DIR, containerWorkDir)
- .setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH,
- nmPrivateContainerScriptPath)
- .setExecutionAttribute(NM_PRIVATE_TOKENS_PATH, nmPrivateTokensPath)
- .setExecutionAttribute(PID_FILE_PATH, pidFilePath)
- .setExecutionAttribute(LOCAL_DIRS, localDirs)
- .setExecutionAttribute(LOG_DIRS, logDirs)
- .setExecutionAttribute(FILECACHE_DIRS, filecacheDirs)
- .setExecutionAttribute(USER_LOCAL_DIRS, userLocalDirs)
- .setExecutionAttribute(CONTAINER_LOCAL_DIRS, containerLocalDirs)
- .setExecutionAttribute(CONTAINER_LOG_DIRS, containerLogDirs)
- .setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions);
-
- if (tcCommandFile != null) {
- builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile);
- }
-
- linuxContainerRuntime.launchContainer(builder.build());
+ linuxContainerRuntime.launchContainer(runtimeContext);
} else {
LOG.info(
"Container was marked as inactive. Returning terminated error");
@@ -617,6 +575,50 @@ public class LinuxContainerExecutor extends ContainerExecutor {
return 0;
}
+ private ContainerRuntimeContext buildContainerRuntimeContext(
+ ContainerStartContext ctx, Path pidFilePath,
+ String resourcesOptions, String tcCommandFile) {
+
+ List<String> prefixCommands = new ArrayList<>();
+ addSchedPriorityCommand(prefixCommands);
+
+ Container container = ctx.getContainer();
+
+ ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext
+ .Builder(container);
+ if (prefixCommands.size() > 0) {
+ builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS,
+ prefixCommands);
+ }
+
+ builder.setExecutionAttribute(LOCALIZED_RESOURCES,
+ ctx.getLocalizedResources())
+ .setExecutionAttribute(RUN_AS_USER, getRunAsUser(ctx.getUser()))
+ .setExecutionAttribute(USER, ctx.getUser())
+ .setExecutionAttribute(APPID, ctx.getAppId())
+ .setExecutionAttribute(CONTAINER_ID_STR,
+ container.getContainerId().toString())
+ .setExecutionAttribute(CONTAINER_WORK_DIR, ctx.getContainerWorkDir())
+ .setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH,
+ ctx.getNmPrivateContainerScriptPath())
+ .setExecutionAttribute(NM_PRIVATE_TOKENS_PATH,
+ ctx.getNmPrivateTokensPath())
+ .setExecutionAttribute(PID_FILE_PATH, pidFilePath)
+ .setExecutionAttribute(LOCAL_DIRS, ctx.getLocalDirs())
+ .setExecutionAttribute(LOG_DIRS, ctx.getLogDirs())
+ .setExecutionAttribute(FILECACHE_DIRS, ctx.getFilecacheDirs())
+ .setExecutionAttribute(USER_LOCAL_DIRS, ctx.getUserLocalDirs())
+ .setExecutionAttribute(CONTAINER_LOCAL_DIRS, ctx.getContainerLocalDirs())
+ .setExecutionAttribute(CONTAINER_LOG_DIRS, ctx.getContainerLogDirs())
+ .setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions);
+
+ if (tcCommandFile != null) {
+ builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile);
+ }
+
+ return builder.build();
+ }
+
@Override
public String[] getIpAndHost(Container container) {
return linuxContainerRuntime.getIpAndHost(container);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a61438/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index dd5b279..00073d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -761,200 +761,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
protected void startStatusUpdater() {
- statusUpdaterRunnable = new Runnable() {
- @Override
- @SuppressWarnings("unchecked")
- public void run() {
- int lastHeartbeatID = 0;
- while (!isStopped) {
- // Send heartbeat
- try {
- NodeHeartbeatResponse response = null;
- Set<NodeLabel> nodeLabelsForHeartbeat =
- nodeLabelsHandler.getNodeLabelsForHeartbeat();
- NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
- NodeHeartbeatRequest request =
- NodeHeartbeatRequest.newInstance(nodeStatus,
- NodeStatusUpdaterImpl.this.context
- .getContainerTokenSecretManager().getCurrentKey(),
- NodeStatusUpdaterImpl.this.context
- .getNMTokenSecretManager().getCurrentKey(),
- nodeLabelsForHeartbeat,
- NodeStatusUpdaterImpl.this.context
- .getRegisteredCollectors());
-
- if (logAggregationEnabled) {
- // pull log aggregation status for application running in this NM
- List<LogAggregationReport> logAggregationReports =
- getLogAggregationReportsForApps(context
- .getLogAggregationStatusForApps());
- if (logAggregationReports != null
- && !logAggregationReports.isEmpty()) {
- request.setLogAggregationReportsForApps(logAggregationReports);
- }
- }
-
- response = resourceTracker.nodeHeartbeat(request);
- //get next heartbeat interval from response
- nextHeartBeatInterval = response.getNextHeartBeatInterval();
- updateMasterKeys(response);
-
- if (!handleShutdownOrResyncCommand(response)) {
- nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
- response);
-
- // Explicitly put this method after checking the resync
- // response. We
- // don't want to remove the completed containers before resync
- // because these completed containers will be reported back to RM
- // when NM re-registers with RM.
- // Only remove the cleanedup containers that are acked
- removeOrTrackCompletedContainersFromContext(response
- .getContainersToBeRemovedFromNM());
-
- logAggregationReportForAppsTempList.clear();
- lastHeartbeatID = response.getResponseId();
- List<ContainerId> containersToCleanup = response
- .getContainersToCleanup();
- if (!containersToCleanup.isEmpty()) {
- dispatcher.getEventHandler().handle(
- new CMgrCompletedContainersEvent(containersToCleanup,
- CMgrCompletedContainersEvent.Reason
- .BY_RESOURCEMANAGER));
- }
- List<ApplicationId> appsToCleanup =
- response.getApplicationsToCleanup();
- //Only start tracking for keepAlive on FINISH_APP
- trackAppsForKeepAlive(appsToCleanup);
- if (!appsToCleanup.isEmpty()) {
- dispatcher.getEventHandler().handle(
- new CMgrCompletedAppsEvent(appsToCleanup,
- CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
- }
- Map<ApplicationId, ByteBuffer> systemCredentials =
- response.getSystemCredentialsForApps();
- if (systemCredentials != null && !systemCredentials.isEmpty()) {
- ((NMContext) context).setSystemCrendentialsForApps(
- parseCredentials(systemCredentials));
- }
- List<org.apache.hadoop.yarn.api.records.Container>
- containersToDecrease = response.getContainersToDecrease();
- if (!containersToDecrease.isEmpty()) {
- dispatcher.getEventHandler().handle(
- new CMgrDecreaseContainersResourceEvent(
- containersToDecrease)
- );
- }
-
- // SignalContainer request originally comes from end users via
- // ClientRMProtocol's SignalContainer. Forward the request to
- // ContainerManager which will dispatch the event to
- // ContainerLauncher.
- List<SignalContainerRequest> containersToSignal = response
- .getContainersToSignalList();
- if (containersToSignal.size() != 0) {
- dispatcher.getEventHandler().handle(
- new CMgrSignalContainersEvent(containersToSignal));
- }
-
- // Update QueuingLimits if ContainerManager supports queuing
- ContainerQueuingLimit queuingLimit =
- response.getContainerQueuingLimit();
- if (queuingLimit != null) {
- context.getContainerManager().updateQueuingLimit(queuingLimit);
- }
- }
- // Handling node resource update case.
- Resource newResource = response.getResource();
- if (newResource != null) {
- updateNMResource(newResource);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Node's resource is updated to " +
- newResource.toString());
- }
- }
- if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
- updateTimelineClientsAddress(response);
- }
-
- } catch (ConnectException e) {
- //catch and throw the exception if tried MAX wait time to connect RM
- dispatcher.getEventHandler().handle(
- new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
- // failed to connect to RM.
- failedToConnect = true;
- throw new YarnRuntimeException(e);
- } catch (Throwable e) {
-
- // TODO Better error handling. Thread can die with the rest of the
- // NM still running.
- LOG.error("Caught exception in status-updater", e);
- } finally {
- synchronized (heartbeatMonitor) {
- nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
- YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
- nextHeartBeatInterval;
- try {
- heartbeatMonitor.wait(nextHeartBeatInterval);
- } catch (InterruptedException e) {
- // Do Nothing
- }
- }
- }
- }
- }
-
- private void updateTimelineClientsAddress(
- NodeHeartbeatResponse response) {
- Map<ApplicationId, String> knownCollectorsMap =
- response.getAppCollectorsMap();
- if (knownCollectorsMap == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No collectors to update RM");
- }
- } else {
- Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
- knownCollectorsMap.entrySet();
- for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
- ApplicationId appId = entry.getKey();
- String collectorAddr = entry.getValue();
-
- // Only handle applications running on local node.
- // Not include apps with timeline collectors running in local
- Application application = context.getApplications().get(appId);
- // TODO this logic could be problematic if the collector address
- // gets updated due to NM restart or collector service failure
- if (application != null &&
- !context.getRegisteredCollectors().containsKey(appId)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sync a new collector address: " + collectorAddr +
- " for application: " + appId + " from RM.");
- }
- NMTimelinePublisher nmTimelinePublisher =
- context.getNMTimelinePublisher();
- if (nmTimelinePublisher != null) {
- nmTimelinePublisher.setTimelineServiceAddress(
- application.getAppId(), collectorAddr);
- }
- }
- }
- }
- }
-
- private void updateMasterKeys(NodeHeartbeatResponse response) {
- // See if the master-key has rolled over
- MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
- if (updatedMasterKey != null) {
- // Will be non-null only on roll-over on RM side
- context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey);
- }
-
- updatedMasterKey = response.getNMTokenMasterKey();
- if (updatedMasterKey != null) {
- context.getNMTokenSecretManager().setMasterKey(updatedMasterKey);
- }
- }
- };
+ statusUpdaterRunnable = new StatusUpdaterRunnable();
statusUpdater =
new Thread(statusUpdaterRunnable, "Node Status Updater");
statusUpdater.start();
@@ -1215,4 +1022,199 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
}
}
+
+ private class StatusUpdaterRunnable implements Runnable {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ int lastHeartbeatID = 0;
+ while (!isStopped) {
+ // Send heartbeat
+ try {
+ NodeHeartbeatResponse response = null;
+ Set<NodeLabel> nodeLabelsForHeartbeat =
+ nodeLabelsHandler.getNodeLabelsForHeartbeat();
+ NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
+ NodeHeartbeatRequest request =
+ NodeHeartbeatRequest.newInstance(nodeStatus,
+ NodeStatusUpdaterImpl.this.context
+ .getContainerTokenSecretManager().getCurrentKey(),
+ NodeStatusUpdaterImpl.this.context
+ .getNMTokenSecretManager().getCurrentKey(),
+ nodeLabelsForHeartbeat,
+ NodeStatusUpdaterImpl.this.context
+ .getRegisteredCollectors());
+
+ if (logAggregationEnabled) {
+ // pull log aggregation status for application running in this NM
+ List<LogAggregationReport> logAggregationReports =
+ getLogAggregationReportsForApps(context
+ .getLogAggregationStatusForApps());
+ if (logAggregationReports != null
+ && !logAggregationReports.isEmpty()) {
+ request.setLogAggregationReportsForApps(logAggregationReports);
+ }
+ }
+
+ response = resourceTracker.nodeHeartbeat(request);
+ //get next heartbeat interval from response
+ nextHeartBeatInterval = response.getNextHeartBeatInterval();
+ updateMasterKeys(response);
+
+ if (!handleShutdownOrResyncCommand(response)) {
+ nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
+ response);
+
+ // Explicitly put this method after checking the resync
+ // response. We
+ // don't want to remove the completed containers before resync
+ // because these completed containers will be reported back to RM
+ // when NM re-registers with RM.
+ // Only remove the cleanedup containers that are acked
+ removeOrTrackCompletedContainersFromContext(response
+ .getContainersToBeRemovedFromNM());
+
+ logAggregationReportForAppsTempList.clear();
+ lastHeartbeatID = response.getResponseId();
+ List<ContainerId> containersToCleanup = response
+ .getContainersToCleanup();
+ if (!containersToCleanup.isEmpty()) {
+ dispatcher.getEventHandler().handle(
+ new CMgrCompletedContainersEvent(containersToCleanup,
+ CMgrCompletedContainersEvent.Reason
+ .BY_RESOURCEMANAGER));
+ }
+ List<ApplicationId> appsToCleanup =
+ response.getApplicationsToCleanup();
+ //Only start tracking for keepAlive on FINISH_APP
+ trackAppsForKeepAlive(appsToCleanup);
+ if (!appsToCleanup.isEmpty()) {
+ dispatcher.getEventHandler().handle(
+ new CMgrCompletedAppsEvent(appsToCleanup,
+ CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+ }
+ Map<ApplicationId, ByteBuffer> systemCredentials =
+ response.getSystemCredentialsForApps();
+ if (systemCredentials != null && !systemCredentials.isEmpty()) {
+ ((NMContext) context).setSystemCrendentialsForApps(
+ parseCredentials(systemCredentials));
+ }
+ List<org.apache.hadoop.yarn.api.records.Container>
+ containersToDecrease = response.getContainersToDecrease();
+ if (!containersToDecrease.isEmpty()) {
+ dispatcher.getEventHandler().handle(
+ new CMgrDecreaseContainersResourceEvent(
+ containersToDecrease)
+ );
+ }
+
+ // SignalContainer request originally comes from end users via
+ // ClientRMProtocol's SignalContainer. Forward the request to
+ // ContainerManager which will dispatch the event to
+ // ContainerLauncher.
+ List<SignalContainerRequest> containersToSignal = response
+ .getContainersToSignalList();
+ if (!containersToSignal.isEmpty()) {
+ dispatcher.getEventHandler().handle(
+ new CMgrSignalContainersEvent(containersToSignal));
+ }
+
+ // Update QueuingLimits if ContainerManager supports queuing
+ ContainerQueuingLimit queuingLimit =
+ response.getContainerQueuingLimit();
+ if (queuingLimit != null) {
+ context.getContainerManager().updateQueuingLimit(queuingLimit);
+ }
+ }
+ // Handling node resource update case.
+ Resource newResource = response.getResource();
+ if (newResource != null) {
+ updateNMResource(newResource);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node's resource is updated to " +
+ newResource.toString());
+ }
+ }
+ if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
+ updateTimelineClientsAddress(response);
+ }
+
+ } catch (ConnectException e) {
+ //catch and throw the exception if tried MAX wait time to connect RM
+ dispatcher.getEventHandler().handle(
+ new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
+ // failed to connect to RM.
+ failedToConnect = true;
+ throw new YarnRuntimeException(e);
+ } catch (Exception e) {
+
+ // TODO Better error handling. Thread can die with the rest of the
+ // NM still running.
+ LOG.error("Caught exception in status-updater", e);
+ } finally {
+ synchronized (heartbeatMonitor) {
+ nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
+ nextHeartBeatInterval;
+ try {
+ heartbeatMonitor.wait(nextHeartBeatInterval);
+ } catch (InterruptedException e) {
+ // Do Nothing
+ }
+ }
+ }
+ }
+ }
+
+ private void updateTimelineClientsAddress(
+ NodeHeartbeatResponse response) {
+ Map<ApplicationId, String> knownCollectorsMap =
+ response.getAppCollectorsMap();
+ if (knownCollectorsMap == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No collectors to update RM");
+ }
+ } else {
+ Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
+ knownCollectorsMap.entrySet();
+ for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
+ ApplicationId appId = entry.getKey();
+ String collectorAddr = entry.getValue();
+
+ // Only handle applications running on local node.
+ // Not include apps with timeline collectors running in local
+ Application application = context.getApplications().get(appId);
+ // TODO this logic could be problematic if the collector address
+ // gets updated due to NM restart or collector service failure
+ if (application != null &&
+ !context.getRegisteredCollectors().containsKey(appId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sync a new collector address: " + collectorAddr +
+ " for application: " + appId + " from RM.");
+ }
+ NMTimelinePublisher nmTimelinePublisher =
+ context.getNMTimelinePublisher();
+ if (nmTimelinePublisher != null) {
+ nmTimelinePublisher.setTimelineServiceAddress(
+ application.getAppId(), collectorAddr);
+ }
+ }
+ }
+ }
+ }
+
+ private void updateMasterKeys(NodeHeartbeatResponse response) {
+ // See if the master-key has rolled over
+ MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
+ if (updatedMasterKey != null) {
+ // Will be non-null only on roll-over on RM side
+ context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey);
+ }
+
+ updatedMasterKey = response.getNMTokenMasterKey();
+ if (updatedMasterKey != null) {
+ context.getNMTokenSecretManager().setMasterKey(updatedMasterKey);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a61438/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 1fcccde..0b599a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -213,12 +213,7 @@ public class ContainerLaunch implements Callable<Integer> {
DataOutputStream tokensOutStream = null;
// Select the working directory for the container
- Path containerWorkDir =
- dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE
- + Path.SEPARATOR + user + Path.SEPARATOR
- + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
- + Path.SEPARATOR + containerIdStr,
- LocalDirAllocator.SIZE_UNKNOWN, false);
+ Path containerWorkDir = deriveContainerWorkDir();
recordContainerWorkDir(containerID, containerWorkDir.toString());
String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
@@ -259,12 +254,8 @@ public class ContainerLaunch implements Callable<Integer> {
sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs,
containerLogDirs, localResources, nmPrivateClasspathJarDir);
- exec.prepareContainer(new ContainerPrepareContext.Builder()
- .setContainer(container)
- .setLocalizedResources(localResources)
- .setUser(user)
- .setContainerLocalDirs(containerLocalDirs)
- .setCommands(launchContext.getCommands()).build());
+ prepareContainer(localResources, containerLocalDirs);
+
// Write out the environment
exec.writeLaunchEnv(containerScriptOutStream, environment,
localResources, launchContext.getCommands(),
@@ -317,6 +308,39 @@ public class ContainerLaunch implements Callable<Integer> {
return ret;
}
+ private Path deriveContainerWorkDir() throws IOException {
+
+ final String containerWorkDirPath =
+ ContainerLocalizer.USERCACHE +
+ Path.SEPARATOR +
+ container.getUser() +
+ Path.SEPARATOR +
+ ContainerLocalizer.APPCACHE +
+ Path.SEPARATOR +
+ app.getAppId().toString() +
+ Path.SEPARATOR +
+ container.getContainerId().toString();
+
+ final Path containerWorkDir =
+ dirsHandler.getLocalPathForWrite(
+ containerWorkDirPath,
+ LocalDirAllocator.SIZE_UNKNOWN, false);
+
+ return containerWorkDir;
+ }
+
+ private void prepareContainer(Map<Path, List<String>> localResources,
+ List<String> containerLocalDirs) throws IOException {
+
+ exec.prepareContainer(new ContainerPrepareContext.Builder()
+ .setContainer(container)
+ .setLocalizedResources(localResources)
+ .setUser(container.getUser())
+ .setContainerLocalDirs(containerLocalDirs)
+ .setCommands(container.getLaunchContext().getCommands())
+ .build());
+ }
+
@SuppressWarnings("unchecked")
protected boolean validateContainerState() {
// CONTAINER_KILLED_ON_REQUEST should not be missed if the container
@@ -1116,98 +1140,9 @@ public class ContainerLaunch implements Callable<Integer> {
// TODO: Remove Windows check and use this approach on all platforms after
// additional testing. See YARN-358.
if (Shell.WINDOWS) {
-
- String inputClassPath = environment.get(Environment.CLASSPATH.name());
-
- if (inputClassPath != null && !inputClassPath.isEmpty()) {
-
- //On non-windows, localized resources
- //from distcache are available via the classpath as they were placed
- //there but on windows they are not available when the classpath
- //jar is created and so they "are lost" and have to be explicitly
- //added to the classpath instead. This also means that their position
- //is lost relative to other non-distcache classpath entries which will
- //break things like mapreduce.job.user.classpath.first. An environment
- //variable can be set to indicate that distcache entries should come
- //first
-
- boolean preferLocalizedJars = Boolean.parseBoolean(
- environment.get(Environment.CLASSPATH_PREPEND_DISTCACHE.name())
- );
-
- boolean needsSeparator = false;
- StringBuilder newClassPath = new StringBuilder();
- if (!preferLocalizedJars) {
- newClassPath.append(inputClassPath);
- needsSeparator = true;
- }
-
- // Localized resources do not exist at the desired paths yet, because the
- // container launch script has not run to create symlinks yet. This
- // means that FileUtil.createJarWithClassPath can't automatically expand
- // wildcards to separate classpath entries for each file in the manifest.
- // To resolve this, append classpath entries explicitly for each
- // resource.
- for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
- boolean targetIsDirectory = new File(entry.getKey().toUri().getPath())
- .isDirectory();
-
- for (String linkName : entry.getValue()) {
- // Append resource.
- if (needsSeparator) {
- newClassPath.append(File.pathSeparator);
- } else {
- needsSeparator = true;
- }
- newClassPath.append(pwd.toString())
- .append(Path.SEPARATOR).append(linkName);
-
- // FileUtil.createJarWithClassPath must use File.toURI to convert
- // each file to a URI to write into the manifest's classpath. For
- // directories, the classpath must have a trailing '/', but
- // File.toURI only appends the trailing '/' if it is a directory that
- // already exists. To resolve this, add the classpath entries with
- // explicit trailing '/' here for any localized resource that targets
- // a directory. Then, FileUtil.createJarWithClassPath will guarantee
- // that the resulting entry in the manifest's classpath will have a
- // trailing '/', and thus refer to a directory instead of a file.
- if (targetIsDirectory) {
- newClassPath.append(Path.SEPARATOR);
- }
- }
- }
- if (preferLocalizedJars) {
- if (needsSeparator) {
- newClassPath.append(File.pathSeparator);
- }
- newClassPath.append(inputClassPath);
- }
- // When the container launches, it takes the parent process's environment
- // and then adds/overwrites with the entries from the container launch
- // context. Do the same thing here for correct substitution of
- // environment variables in the classpath jar manifest.
- Map<String, String> mergedEnv = new HashMap<String, String>(
- System.getenv());
- mergedEnv.putAll(environment);
-
- // this is hacky and temporary - it's to preserve the windows secure
- // behavior but enable non-secure windows to properly build the class
- // path for access to job.jar/lib/xyz and friends (see YARN-2803)
- Path jarDir;
- if (exec instanceof WindowsSecureContainerExecutor) {
- jarDir = nmPrivateClasspathJarDir;
- } else {
- jarDir = pwd;
- }
- String[] jarCp = FileUtil.createJarWithClassPath(
- newClassPath.toString(), jarDir, pwd, mergedEnv);
- // In a secure cluster the classpath jar must be localized to grant access
- Path localizedClassPathJar = exec.localizeClasspathJar(
- new Path(jarCp[0]), pwd, container.getUser());
- String replacementClassPath = localizedClassPathJar.toString() + jarCp[1];
- environment.put(Environment.CLASSPATH.name(), replacementClassPath);
- }
+ sanitizeWindowsEnv(environment, pwd,
+ resources, nmPrivateClasspathJarDir);
}
// put AuxiliaryService data to environment
for (Map.Entry<String, ByteBuffer> meta : containerManager
@@ -1217,6 +1152,103 @@ public class ContainerLaunch implements Callable<Integer> {
}
}
+ private void sanitizeWindowsEnv(Map<String, String> environment, Path pwd,
+ Map<Path, List<String>> resources, Path nmPrivateClasspathJarDir)
+ throws IOException {
+
+ String inputClassPath = environment.get(Environment.CLASSPATH.name());
+
+ if (inputClassPath != null && !inputClassPath.isEmpty()) {
+
+ //On non-windows, localized resources
+ //from distcache are available via the classpath as they were placed
+ //there but on windows they are not available when the classpath
+ //jar is created and so they "are lost" and have to be explicitly
+ //added to the classpath instead. This also means that their position
+ //is lost relative to other non-distcache classpath entries which will
+ //break things like mapreduce.job.user.classpath.first. An environment
+ //variable can be set to indicate that distcache entries should come
+ //first
+
+ boolean preferLocalizedJars = Boolean.parseBoolean(
+ environment.get(Environment.CLASSPATH_PREPEND_DISTCACHE.name())
+ );
+
+ boolean needsSeparator = false;
+ StringBuilder newClassPath = new StringBuilder();
+ if (!preferLocalizedJars) {
+ newClassPath.append(inputClassPath);
+ needsSeparator = true;
+ }
+
+ // Localized resources do not exist at the desired paths yet, because the
+ // container launch script has not run to create symlinks yet. This
+ // means that FileUtil.createJarWithClassPath can't automatically expand
+ // wildcards to separate classpath entries for each file in the manifest.
+ // To resolve this, append classpath entries explicitly for each
+ // resource.
+ for (Map.Entry<Path, List<String>> entry : resources.entrySet()) {
+ boolean targetIsDirectory = new File(entry.getKey().toUri().getPath())
+ .isDirectory();
+
+ for (String linkName : entry.getValue()) {
+ // Append resource.
+ if (needsSeparator) {
+ newClassPath.append(File.pathSeparator);
+ } else {
+ needsSeparator = true;
+ }
+ newClassPath.append(pwd.toString())
+ .append(Path.SEPARATOR).append(linkName);
+
+ // FileUtil.createJarWithClassPath must use File.toURI to convert
+ // each file to a URI to write into the manifest's classpath. For
+ // directories, the classpath must have a trailing '/', but
+ // File.toURI only appends the trailing '/' if it is a directory that
+ // already exists. To resolve this, add the classpath entries with
+ // explicit trailing '/' here for any localized resource that targets
+ // a directory. Then, FileUtil.createJarWithClassPath will guarantee
+ // that the resulting entry in the manifest's classpath will have a
+ // trailing '/', and thus refer to a directory instead of a file.
+ if (targetIsDirectory) {
+ newClassPath.append(Path.SEPARATOR);
+ }
+ }
+ }
+ if (preferLocalizedJars) {
+ if (needsSeparator) {
+ newClassPath.append(File.pathSeparator);
+ }
+ newClassPath.append(inputClassPath);
+ }
+
+ // When the container launches, it takes the parent process's environment
+ // and then adds/overwrites with the entries from the container launch
+ // context. Do the same thing here for correct substitution of
+ // environment variables in the classpath jar manifest.
+ Map<String, String> mergedEnv = new HashMap<String, String>(
+ System.getenv());
+ mergedEnv.putAll(environment);
+
+ // this is hacky and temporary - it's to preserve the windows secure
+ // behavior but enable non-secure windows to properly build the class
+ // path for access to job.jar/lib/xyz and friends (see YARN-2803)
+ Path jarDir;
+ if (exec instanceof WindowsSecureContainerExecutor) {
+ jarDir = nmPrivateClasspathJarDir;
+ } else {
+ jarDir = pwd;
+ }
+ String[] jarCp = FileUtil.createJarWithClassPath(
+ newClassPath.toString(), jarDir, pwd, mergedEnv);
+ // In a secure cluster the classpath jar must be localized to grant access
+ Path localizedClassPathJar = exec.localizeClasspathJar(
+ new Path(jarCp[0]), pwd, container.getUser());
+ String replacementClassPath = localizedClassPathJar.toString() + jarCp[1];
+ environment.put(Environment.CLASSPATH.name(), replacementClassPath);
+ }
+ }
+
public static String getExitCodeFile(String pidFile) {
return pidFile + EXIT_CODE_FILE_SUFFIX;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a61438/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
index b70a4e1..ed81331 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
@@ -424,10 +424,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
//List<String> -> stored as List -> fetched/converted to List<String>
//we can't do better here thanks to type-erasure
@SuppressWarnings("unchecked")
- List<String> localDirs = ctx.getExecutionAttribute(LOCAL_DIRS);
- @SuppressWarnings("unchecked")
- List<String> logDirs = ctx.getExecutionAttribute(LOG_DIRS);
- @SuppressWarnings("unchecked")
List<String> filecacheDirs = ctx.getExecutionAttribute(FILECACHE_DIRS);
@SuppressWarnings("unchecked")
List<String> containerLocalDirs = ctx.getExecutionAttribute(
@@ -489,9 +485,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand);
- Path nmPrivateContainerScriptPath = ctx.getExecutionAttribute(
- NM_PRIVATE_CONTAINER_SCRIPT_PATH);
-
String disableOverride = environment.get(
ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE);
@@ -511,33 +504,8 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
String commandFile = dockerClient.writeCommandToTempFile(runCommand,
containerIdStr);
- PrivilegedOperation launchOp = new PrivilegedOperation(
- PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER);
-
- launchOp.appendArgs(runAsUser, ctx.getExecutionAttribute(USER),
- Integer.toString(PrivilegedOperation
- .RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()),
- ctx.getExecutionAttribute(APPID),
- containerIdStr, containerWorkDir.toString(),
- nmPrivateContainerScriptPath.toUri().getPath(),
- ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(),
- ctx.getExecutionAttribute(PID_FILE_PATH).toString(),
- StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
- localDirs),
- StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
- logDirs),
- commandFile,
- resourcesOpts);
-
- String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE);
-
- if (tcCommandFile != null) {
- launchOp.appendArgs(tcCommandFile);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Launching container with cmd: " + runCommand
- .getCommandWithArguments());
- }
+ PrivilegedOperation launchOp = buildLaunchOp(ctx,
+ commandFile, runCommand);
try {
privilegedOperationExecutor.executePrivilegedOperation(null,
@@ -635,4 +603,53 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
}
return null;
}
+
+
+
+ private PrivilegedOperation buildLaunchOp(ContainerRuntimeContext ctx,
+ String commandFile, DockerRunCommand runCommand) {
+
+ String runAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
+ String containerIdStr = ctx.getContainer().getContainerId().toString();
+ Path nmPrivateContainerScriptPath = ctx.getExecutionAttribute(
+ NM_PRIVATE_CONTAINER_SCRIPT_PATH);
+ Path containerWorkDir = ctx.getExecutionAttribute(CONTAINER_WORK_DIR);
+ //we can't do better here thanks to type-erasure
+ @SuppressWarnings("unchecked")
+ List<String> localDirs = ctx.getExecutionAttribute(LOCAL_DIRS);
+ @SuppressWarnings("unchecked")
+ List<String> logDirs = ctx.getExecutionAttribute(LOG_DIRS);
+ String resourcesOpts = ctx.getExecutionAttribute(RESOURCES_OPTIONS);
+
+ PrivilegedOperation launchOp = new PrivilegedOperation(
+ PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER);
+
+ launchOp.appendArgs(runAsUser, ctx.getExecutionAttribute(USER),
+ Integer.toString(PrivilegedOperation
+ .RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()),
+ ctx.getExecutionAttribute(APPID),
+ containerIdStr,
+ containerWorkDir.toString(),
+ nmPrivateContainerScriptPath.toUri().getPath(),
+ ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(),
+ ctx.getExecutionAttribute(PID_FILE_PATH).toString(),
+ StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+ localDirs),
+ StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+ logDirs),
+ commandFile,
+ resourcesOpts);
+
+ String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE);
+
+ if (tcCommandFile != null) {
+ launchOp.appendArgs(tcCommandFile);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Launching container with cmd: " + runCommand
+ .getCommandWithArguments());
+ }
+
+ return launchOp;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org