You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by te...@apache.org on 2017/05/10 17:46:56 UTC

hadoop git commit: YARN-6475. Fix some long function checkstyle issues (Contributed by Soumabrata Chakraborty via Daniel Templeton)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 1e71fe8c4 -> 74a61438c


YARN-6475. Fix some long function checkstyle issues
(Contributed by Soumabrata Chakraborty via Daniel Templeton)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74a61438
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74a61438
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74a61438

Branch: refs/heads/trunk
Commit: 74a61438ca01e2191b54000af73b654a2d0b8253
Parents: 1e71fe8
Author: Daniel Templeton <te...@apache.org>
Authored: Wed May 10 10:45:02 2017 -0700
Committer: Daniel Templeton <te...@apache.org>
Committed: Wed May 10 10:46:50 2017 -0700

----------------------------------------------------------------------
 .../nodemanager/LinuxContainerExecutor.java     |  92 ++---
 .../nodemanager/NodeStatusUpdaterImpl.java      | 390 ++++++++++---------
 .../launcher/ContainerLaunch.java               | 238 ++++++-----
 .../runtime/DockerLinuxContainerRuntime.java    |  85 ++--
 4 files changed, 429 insertions(+), 376 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a61438/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index cb1d53d..9a3b2d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -62,7 +62,6 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 import java.util.regex.Pattern;
 
 import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.*;
@@ -442,24 +441,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
   public int launchContainer(ContainerStartContext ctx)
       throws IOException, ConfigurationException {
     Container container = ctx.getContainer();
-    Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
-    Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
     String user = ctx.getUser();
-    String appId = ctx.getAppId();
-    Path containerWorkDir = ctx.getContainerWorkDir();
-    List<String> localDirs = ctx.getLocalDirs();
-    List<String> logDirs = ctx.getLogDirs();
-    List<String> filecacheDirs = ctx.getFilecacheDirs();
-    List<String> userLocalDirs = ctx.getUserLocalDirs();
-    List<String> containerLocalDirs = ctx.getContainerLocalDirs();
-    List<String> containerLogDirs = ctx.getContainerLogDirs();
-    Map<Path, List<String>> localizedResources = ctx.getLocalizedResources();
 
     verifyUsernamePattern(user);
-    String runAsUser = getRunAsUser(user);
 
     ContainerId containerId = container.getContainerId();
-    String containerIdStr = containerId.toString();
 
     resourcesHandler.preExecute(containerId,
             container.getResource());
@@ -514,39 +500,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
     try {
       Path pidFilePath = getPidFilePath(containerId);
       if (pidFilePath != null) {
-        List<String> prefixCommands = new ArrayList<>();
-        ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext
-            .Builder(container);
 
-        addSchedPriorityCommand(prefixCommands);
-        if (prefixCommands.size() > 0) {
-          builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS,
-              prefixCommands);
-        }
+        ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(
+            ctx, pidFilePath, resourcesOptions, tcCommandFile);
 
-        builder.setExecutionAttribute(LOCALIZED_RESOURCES, localizedResources)
-            .setExecutionAttribute(RUN_AS_USER, runAsUser)
-            .setExecutionAttribute(USER, user)
-            .setExecutionAttribute(APPID, appId)
-            .setExecutionAttribute(CONTAINER_ID_STR, containerIdStr)
-            .setExecutionAttribute(CONTAINER_WORK_DIR, containerWorkDir)
-            .setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH,
-                nmPrivateContainerScriptPath)
-            .setExecutionAttribute(NM_PRIVATE_TOKENS_PATH, nmPrivateTokensPath)
-            .setExecutionAttribute(PID_FILE_PATH, pidFilePath)
-            .setExecutionAttribute(LOCAL_DIRS, localDirs)
-            .setExecutionAttribute(LOG_DIRS, logDirs)
-            .setExecutionAttribute(FILECACHE_DIRS, filecacheDirs)
-            .setExecutionAttribute(USER_LOCAL_DIRS, userLocalDirs)
-            .setExecutionAttribute(CONTAINER_LOCAL_DIRS, containerLocalDirs)
-            .setExecutionAttribute(CONTAINER_LOG_DIRS, containerLogDirs)
-            .setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions);
-
-        if (tcCommandFile != null) {
-          builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile);
-        }
-
-        linuxContainerRuntime.launchContainer(builder.build());
+        linuxContainerRuntime.launchContainer(runtimeContext);
       } else {
         LOG.info(
             "Container was marked as inactive. Returning terminated error");
@@ -617,6 +575,50 @@ public class LinuxContainerExecutor extends ContainerExecutor {
     return 0;
   }
 
+  private ContainerRuntimeContext buildContainerRuntimeContext(
+      ContainerStartContext ctx, Path pidFilePath,
+      String resourcesOptions, String tcCommandFile) {
+
+    List<String> prefixCommands = new ArrayList<>();
+    addSchedPriorityCommand(prefixCommands);
+
+    Container container = ctx.getContainer();
+
+    ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext
+            .Builder(container);
+    if (prefixCommands.size() > 0) {
+      builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS,
+              prefixCommands);
+    }
+
+    builder.setExecutionAttribute(LOCALIZED_RESOURCES,
+        ctx.getLocalizedResources())
+      .setExecutionAttribute(RUN_AS_USER, getRunAsUser(ctx.getUser()))
+      .setExecutionAttribute(USER, ctx.getUser())
+      .setExecutionAttribute(APPID, ctx.getAppId())
+      .setExecutionAttribute(CONTAINER_ID_STR,
+        container.getContainerId().toString())
+      .setExecutionAttribute(CONTAINER_WORK_DIR, ctx.getContainerWorkDir())
+      .setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH,
+        ctx.getNmPrivateContainerScriptPath())
+      .setExecutionAttribute(NM_PRIVATE_TOKENS_PATH,
+        ctx.getNmPrivateTokensPath())
+      .setExecutionAttribute(PID_FILE_PATH, pidFilePath)
+      .setExecutionAttribute(LOCAL_DIRS, ctx.getLocalDirs())
+      .setExecutionAttribute(LOG_DIRS, ctx.getLogDirs())
+      .setExecutionAttribute(FILECACHE_DIRS, ctx.getFilecacheDirs())
+      .setExecutionAttribute(USER_LOCAL_DIRS, ctx.getUserLocalDirs())
+      .setExecutionAttribute(CONTAINER_LOCAL_DIRS, ctx.getContainerLocalDirs())
+      .setExecutionAttribute(CONTAINER_LOG_DIRS, ctx.getContainerLogDirs())
+      .setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions);
+
+    if (tcCommandFile != null) {
+      builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile);
+    }
+
+    return builder.build();
+  }
+
   @Override
   public String[] getIpAndHost(Container container) {
     return linuxContainerRuntime.getIpAndHost(container);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a61438/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index dd5b279..00073d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -761,200 +761,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
   protected void startStatusUpdater() {
 
-    statusUpdaterRunnable = new Runnable() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void run() {
-        int lastHeartbeatID = 0;
-        while (!isStopped) {
-          // Send heartbeat
-          try {
-            NodeHeartbeatResponse response = null;
-            Set<NodeLabel> nodeLabelsForHeartbeat =
-                nodeLabelsHandler.getNodeLabelsForHeartbeat();
-            NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
-            NodeHeartbeatRequest request =
-                NodeHeartbeatRequest.newInstance(nodeStatus,
-                    NodeStatusUpdaterImpl.this.context
-                        .getContainerTokenSecretManager().getCurrentKey(),
-                    NodeStatusUpdaterImpl.this.context
-                        .getNMTokenSecretManager().getCurrentKey(),
-                    nodeLabelsForHeartbeat,
-                    NodeStatusUpdaterImpl.this.context
-                        .getRegisteredCollectors());
-
-            if (logAggregationEnabled) {
-              // pull log aggregation status for application running in this NM
-              List<LogAggregationReport> logAggregationReports =
-                  getLogAggregationReportsForApps(context
-                    .getLogAggregationStatusForApps());
-              if (logAggregationReports != null
-                  && !logAggregationReports.isEmpty()) {
-                request.setLogAggregationReportsForApps(logAggregationReports);
-              }
-            }
-
-            response = resourceTracker.nodeHeartbeat(request);
-            //get next heartbeat interval from response
-            nextHeartBeatInterval = response.getNextHeartBeatInterval();
-            updateMasterKeys(response);
-
-            if (!handleShutdownOrResyncCommand(response)) {
-              nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
-                  response);
-
-              // Explicitly put this method after checking the resync
-              // response. We
-              // don't want to remove the completed containers before resync
-              // because these completed containers will be reported back to RM
-              // when NM re-registers with RM.
-              // Only remove the cleanedup containers that are acked
-              removeOrTrackCompletedContainersFromContext(response
-                  .getContainersToBeRemovedFromNM());
-
-              logAggregationReportForAppsTempList.clear();
-              lastHeartbeatID = response.getResponseId();
-              List<ContainerId> containersToCleanup = response
-                  .getContainersToCleanup();
-              if (!containersToCleanup.isEmpty()) {
-                dispatcher.getEventHandler().handle(
-                    new CMgrCompletedContainersEvent(containersToCleanup,
-                        CMgrCompletedContainersEvent.Reason
-                            .BY_RESOURCEMANAGER));
-              }
-              List<ApplicationId> appsToCleanup =
-                  response.getApplicationsToCleanup();
-              //Only start tracking for keepAlive on FINISH_APP
-              trackAppsForKeepAlive(appsToCleanup);
-              if (!appsToCleanup.isEmpty()) {
-                dispatcher.getEventHandler().handle(
-                    new CMgrCompletedAppsEvent(appsToCleanup,
-                        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
-              }
-              Map<ApplicationId, ByteBuffer> systemCredentials =
-                  response.getSystemCredentialsForApps();
-              if (systemCredentials != null && !systemCredentials.isEmpty()) {
-                ((NMContext) context).setSystemCrendentialsForApps(
-                    parseCredentials(systemCredentials));
-              }
-              List<org.apache.hadoop.yarn.api.records.Container>
-                  containersToDecrease = response.getContainersToDecrease();
-              if (!containersToDecrease.isEmpty()) {
-                dispatcher.getEventHandler().handle(
-                    new CMgrDecreaseContainersResourceEvent(
-                        containersToDecrease)
-                );
-              }
-
-              // SignalContainer request originally comes from end users via
-              // ClientRMProtocol's SignalContainer. Forward the request to
-              // ContainerManager which will dispatch the event to
-              // ContainerLauncher.
-              List<SignalContainerRequest> containersToSignal = response
-                  .getContainersToSignalList();
-              if (containersToSignal.size() != 0) {
-                dispatcher.getEventHandler().handle(
-                    new CMgrSignalContainersEvent(containersToSignal));
-              }
-
-              // Update QueuingLimits if ContainerManager supports queuing
-              ContainerQueuingLimit queuingLimit =
-                  response.getContainerQueuingLimit();
-              if (queuingLimit != null) {
-                context.getContainerManager().updateQueuingLimit(queuingLimit);
-              }
-            }
-            // Handling node resource update case.
-            Resource newResource = response.getResource();
-            if (newResource != null) {
-              updateNMResource(newResource);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Node's resource is updated to " +
-                    newResource.toString());
-              }
-            }
-            if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
-              updateTimelineClientsAddress(response);
-            }
-
-          } catch (ConnectException e) {
-            //catch and throw the exception if tried MAX wait time to connect RM
-            dispatcher.getEventHandler().handle(
-                new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
-            // failed to connect to RM.
-            failedToConnect = true;
-            throw new YarnRuntimeException(e);
-          } catch (Throwable e) {
-
-            // TODO Better error handling. Thread can die with the rest of the
-            // NM still running.
-            LOG.error("Caught exception in status-updater", e);
-          } finally {
-            synchronized (heartbeatMonitor) {
-              nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
-                  YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
-                    nextHeartBeatInterval;
-              try {
-                heartbeatMonitor.wait(nextHeartBeatInterval);
-              } catch (InterruptedException e) {
-                // Do Nothing
-              }
-            }
-          }
-        }
-      }
-
-      private void updateTimelineClientsAddress(
-          NodeHeartbeatResponse response) {
-        Map<ApplicationId, String> knownCollectorsMap =
-            response.getAppCollectorsMap();
-        if (knownCollectorsMap == null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("No collectors to update RM");
-          }
-        } else {
-          Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
-              knownCollectorsMap.entrySet();
-          for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
-            ApplicationId appId = entry.getKey();
-            String collectorAddr = entry.getValue();
-
-            // Only handle applications running on local node.
-            // Not include apps with timeline collectors running in local
-            Application application = context.getApplications().get(appId);
-            // TODO this logic could be problematic if the collector address
-            // gets updated due to NM restart or collector service failure
-            if (application != null &&
-                !context.getRegisteredCollectors().containsKey(appId)) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Sync a new collector address: " + collectorAddr +
-                    " for application: " + appId + " from RM.");
-              }
-              NMTimelinePublisher nmTimelinePublisher =
-                  context.getNMTimelinePublisher();
-              if (nmTimelinePublisher != null) {
-                nmTimelinePublisher.setTimelineServiceAddress(
-                    application.getAppId(), collectorAddr);
-              }
-            }
-          }
-        }
-      }
-
-      private void updateMasterKeys(NodeHeartbeatResponse response) {
-        // See if the master-key has rolled over
-        MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
-        if (updatedMasterKey != null) {
-          // Will be non-null only on roll-over on RM side
-          context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey);
-        }
-        
-        updatedMasterKey = response.getNMTokenMasterKey();
-        if (updatedMasterKey != null) {
-          context.getNMTokenSecretManager().setMasterKey(updatedMasterKey);
-        }
-      }
-    };
+    statusUpdaterRunnable = new StatusUpdaterRunnable();
     statusUpdater =
         new Thread(statusUpdaterRunnable, "Node Status Updater");
     statusUpdater.start();
@@ -1215,4 +1022,199 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       }
     }
   }
+
+  private class StatusUpdaterRunnable implements Runnable {
+    @Override
+    @SuppressWarnings("unchecked")
+    public void run() {
+      int lastHeartbeatID = 0;
+      while (!isStopped) {
+        // Send heartbeat
+        try {
+          NodeHeartbeatResponse response = null;
+          Set<NodeLabel> nodeLabelsForHeartbeat =
+              nodeLabelsHandler.getNodeLabelsForHeartbeat();
+          NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
+          NodeHeartbeatRequest request =
+              NodeHeartbeatRequest.newInstance(nodeStatus,
+                  NodeStatusUpdaterImpl.this.context
+                      .getContainerTokenSecretManager().getCurrentKey(),
+                  NodeStatusUpdaterImpl.this.context
+                      .getNMTokenSecretManager().getCurrentKey(),
+                  nodeLabelsForHeartbeat,
+                  NodeStatusUpdaterImpl.this.context
+                      .getRegisteredCollectors());
+
+          if (logAggregationEnabled) {
+            // pull log aggregation status for application running in this NM
+            List<LogAggregationReport> logAggregationReports =
+                getLogAggregationReportsForApps(context
+                    .getLogAggregationStatusForApps());
+            if (logAggregationReports != null
+                && !logAggregationReports.isEmpty()) {
+              request.setLogAggregationReportsForApps(logAggregationReports);
+            }
+          }
+
+          response = resourceTracker.nodeHeartbeat(request);
+          //get next heartbeat interval from response
+          nextHeartBeatInterval = response.getNextHeartBeatInterval();
+          updateMasterKeys(response);
+
+          if (!handleShutdownOrResyncCommand(response)) {
+            nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
+                response);
+
+            // Explicitly put this method after checking the resync
+            // response. We
+            // don't want to remove the completed containers before resync
+            // because these completed containers will be reported back to RM
+            // when NM re-registers with RM.
+            // Only remove the cleanedup containers that are acked
+            removeOrTrackCompletedContainersFromContext(response
+                .getContainersToBeRemovedFromNM());
+
+            logAggregationReportForAppsTempList.clear();
+            lastHeartbeatID = response.getResponseId();
+            List<ContainerId> containersToCleanup = response
+                .getContainersToCleanup();
+            if (!containersToCleanup.isEmpty()) {
+              dispatcher.getEventHandler().handle(
+                  new CMgrCompletedContainersEvent(containersToCleanup,
+                      CMgrCompletedContainersEvent.Reason
+                          .BY_RESOURCEMANAGER));
+            }
+            List<ApplicationId> appsToCleanup =
+                response.getApplicationsToCleanup();
+            //Only start tracking for keepAlive on FINISH_APP
+            trackAppsForKeepAlive(appsToCleanup);
+            if (!appsToCleanup.isEmpty()) {
+              dispatcher.getEventHandler().handle(
+                  new CMgrCompletedAppsEvent(appsToCleanup,
+                      CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+            }
+            Map<ApplicationId, ByteBuffer> systemCredentials =
+                response.getSystemCredentialsForApps();
+            if (systemCredentials != null && !systemCredentials.isEmpty()) {
+              ((NMContext) context).setSystemCrendentialsForApps(
+                  parseCredentials(systemCredentials));
+            }
+            List<org.apache.hadoop.yarn.api.records.Container>
+                containersToDecrease = response.getContainersToDecrease();
+            if (!containersToDecrease.isEmpty()) {
+              dispatcher.getEventHandler().handle(
+                  new CMgrDecreaseContainersResourceEvent(
+                      containersToDecrease)
+              );
+            }
+
+            // SignalContainer request originally comes from end users via
+            // ClientRMProtocol's SignalContainer. Forward the request to
+            // ContainerManager which will dispatch the event to
+            // ContainerLauncher.
+            List<SignalContainerRequest> containersToSignal = response
+                .getContainersToSignalList();
+            if (!containersToSignal.isEmpty()) {
+              dispatcher.getEventHandler().handle(
+                  new CMgrSignalContainersEvent(containersToSignal));
+            }
+
+            // Update QueuingLimits if ContainerManager supports queuing
+            ContainerQueuingLimit queuingLimit =
+                response.getContainerQueuingLimit();
+            if (queuingLimit != null) {
+              context.getContainerManager().updateQueuingLimit(queuingLimit);
+            }
+          }
+          // Handling node resource update case.
+          Resource newResource = response.getResource();
+          if (newResource != null) {
+            updateNMResource(newResource);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Node's resource is updated to " +
+                  newResource.toString());
+            }
+          }
+          if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
+            updateTimelineClientsAddress(response);
+          }
+
+        } catch (ConnectException e) {
+          //catch and throw the exception if tried MAX wait time to connect RM
+          dispatcher.getEventHandler().handle(
+              new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
+          // failed to connect to RM.
+          failedToConnect = true;
+          throw new YarnRuntimeException(e);
+        } catch (Exception e) {
+
+          // TODO Better error handling. Thread can die with the rest of the
+          // NM still running.
+          LOG.error("Caught exception in status-updater", e);
+        } finally {
+          synchronized (heartbeatMonitor) {
+            nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
+                YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
+                nextHeartBeatInterval;
+            try {
+              heartbeatMonitor.wait(nextHeartBeatInterval);
+            } catch (InterruptedException e) {
+              // Do Nothing
+            }
+          }
+        }
+      }
+    }
+
+    private void updateTimelineClientsAddress(
+        NodeHeartbeatResponse response) {
+      Map<ApplicationId, String> knownCollectorsMap =
+          response.getAppCollectorsMap();
+      if (knownCollectorsMap == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No collectors to update RM");
+        }
+      } else {
+        Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
+            knownCollectorsMap.entrySet();
+        for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
+          ApplicationId appId = entry.getKey();
+          String collectorAddr = entry.getValue();
+
+          // Only handle applications running on local node.
+          // Not include apps with timeline collectors running in local
+          Application application = context.getApplications().get(appId);
+          // TODO this logic could be problematic if the collector address
+          // gets updated due to NM restart or collector service failure
+          if (application != null &&
+              !context.getRegisteredCollectors().containsKey(appId)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Sync a new collector address: " + collectorAddr +
+                      " for application: " + appId + " from RM.");
+            }
+            NMTimelinePublisher nmTimelinePublisher =
+                context.getNMTimelinePublisher();
+            if (nmTimelinePublisher != null) {
+              nmTimelinePublisher.setTimelineServiceAddress(
+                  application.getAppId(), collectorAddr);
+            }
+          }
+        }
+      }
+    }
+
+    private void updateMasterKeys(NodeHeartbeatResponse response) {
+      // See if the master-key has rolled over
+      MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
+      if (updatedMasterKey != null) {
+        // Will be non-null only on roll-over on RM side
+        context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey);
+      }
+
+      updatedMasterKey = response.getNMTokenMasterKey();
+      if (updatedMasterKey != null) {
+        context.getNMTokenSecretManager().setMasterKey(updatedMasterKey);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a61438/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 1fcccde..0b599a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -213,12 +213,7 @@ public class ContainerLaunch implements Callable<Integer> {
       DataOutputStream tokensOutStream = null;
 
       // Select the working directory for the container
-      Path containerWorkDir =
-          dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE
-                  + Path.SEPARATOR + user + Path.SEPARATOR
-                  + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
-                  + Path.SEPARATOR + containerIdStr,
-              LocalDirAllocator.SIZE_UNKNOWN, false);
+      Path containerWorkDir = deriveContainerWorkDir();
       recordContainerWorkDir(containerID, containerWorkDir.toString());
 
       String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
@@ -259,12 +254,8 @@ public class ContainerLaunch implements Callable<Integer> {
         sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs,
             containerLogDirs, localResources, nmPrivateClasspathJarDir);
 
-        exec.prepareContainer(new ContainerPrepareContext.Builder()
-            .setContainer(container)
-            .setLocalizedResources(localResources)
-            .setUser(user)
-            .setContainerLocalDirs(containerLocalDirs)
-            .setCommands(launchContext.getCommands()).build());
+        prepareContainer(localResources, containerLocalDirs);
+
         // Write out the environment
         exec.writeLaunchEnv(containerScriptOutStream, environment,
             localResources, launchContext.getCommands(),
@@ -317,6 +308,39 @@ public class ContainerLaunch implements Callable<Integer> {
     return ret;
   }
 
+  private Path deriveContainerWorkDir() throws IOException {
+
+    final String containerWorkDirPath =
+        ContainerLocalizer.USERCACHE +
+        Path.SEPARATOR +
+        container.getUser() +
+        Path.SEPARATOR +
+        ContainerLocalizer.APPCACHE +
+        Path.SEPARATOR +
+        app.getAppId().toString() +
+        Path.SEPARATOR +
+        container.getContainerId().toString();
+
+    final Path containerWorkDir =
+        dirsHandler.getLocalPathForWrite(
+          containerWorkDirPath,
+          LocalDirAllocator.SIZE_UNKNOWN, false);
+
+    return containerWorkDir;
+  }
+
+  private void prepareContainer(Map<Path, List<String>> localResources,
+      List<String> containerLocalDirs) throws IOException {
+
+    exec.prepareContainer(new ContainerPrepareContext.Builder()
+        .setContainer(container)
+        .setLocalizedResources(localResources)
+        .setUser(container.getUser())
+        .setContainerLocalDirs(containerLocalDirs)
+        .setCommands(container.getLaunchContext().getCommands())
+        .build());
+  }
+
   @SuppressWarnings("unchecked")
   protected boolean validateContainerState() {
     // CONTAINER_KILLED_ON_REQUEST should not be missed if the container
@@ -1116,98 +1140,9 @@ public class ContainerLaunch implements Callable<Integer> {
     // TODO: Remove Windows check and use this approach on all platforms after
     // additional testing.  See YARN-358.
     if (Shell.WINDOWS) {
-      
-      String inputClassPath = environment.get(Environment.CLASSPATH.name());
-
-      if (inputClassPath != null && !inputClassPath.isEmpty()) {
-
-        //On non-windows, localized resources
-        //from distcache are available via the classpath as they were placed
-        //there but on windows they are not available when the classpath
-        //jar is created and so they "are lost" and have to be explicitly
-        //added to the classpath instead.  This also means that their position
-        //is lost relative to other non-distcache classpath entries which will
-        //break things like mapreduce.job.user.classpath.first.  An environment
-        //variable can be set to indicate that distcache entries should come
-        //first
-
-        boolean preferLocalizedJars = Boolean.parseBoolean(
-          environment.get(Environment.CLASSPATH_PREPEND_DISTCACHE.name())
-          );
-
-        boolean needsSeparator = false;
-        StringBuilder newClassPath = new StringBuilder();
-        if (!preferLocalizedJars) {
-          newClassPath.append(inputClassPath);
-          needsSeparator = true;
-        }
-
-        // Localized resources do not exist at the desired paths yet, because the
-        // container launch script has not run to create symlinks yet.  This
-        // means that FileUtil.createJarWithClassPath can't automatically expand
-        // wildcards to separate classpath entries for each file in the manifest.
-        // To resolve this, append classpath entries explicitly for each
-        // resource.
-        for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
-          boolean targetIsDirectory = new File(entry.getKey().toUri().getPath())
-            .isDirectory();
-
-          for (String linkName : entry.getValue()) {
-            // Append resource.
-            if (needsSeparator) {
-              newClassPath.append(File.pathSeparator);
-            } else {
-              needsSeparator = true;
-            }
-            newClassPath.append(pwd.toString())
-              .append(Path.SEPARATOR).append(linkName);
-
-            // FileUtil.createJarWithClassPath must use File.toURI to convert
-            // each file to a URI to write into the manifest's classpath.  For
-            // directories, the classpath must have a trailing '/', but
-            // File.toURI only appends the trailing '/' if it is a directory that
-            // already exists.  To resolve this, add the classpath entries with
-            // explicit trailing '/' here for any localized resource that targets
-            // a directory.  Then, FileUtil.createJarWithClassPath will guarantee
-            // that the resulting entry in the manifest's classpath will have a
-            // trailing '/', and thus refer to a directory instead of a file.
-            if (targetIsDirectory) {
-              newClassPath.append(Path.SEPARATOR);
-            }
-          }
-        }
-        if (preferLocalizedJars) {
-          if (needsSeparator) {
-            newClassPath.append(File.pathSeparator);
-          }
-          newClassPath.append(inputClassPath);
-        }
 
-        // When the container launches, it takes the parent process's environment
-        // and then adds/overwrites with the entries from the container launch
-        // context.  Do the same thing here for correct substitution of
-        // environment variables in the classpath jar manifest.
-        Map<String, String> mergedEnv = new HashMap<String, String>(
-          System.getenv());
-        mergedEnv.putAll(environment);
-        
-        // this is hacky and temporary - it's to preserve the windows secure
-        // behavior but enable non-secure windows to properly build the class
-        // path for access to job.jar/lib/xyz and friends (see YARN-2803)
-        Path jarDir;
-        if (exec instanceof WindowsSecureContainerExecutor) {
-          jarDir = nmPrivateClasspathJarDir;
-        } else {
-          jarDir = pwd; 
-        }
-        String[] jarCp = FileUtil.createJarWithClassPath(
-          newClassPath.toString(), jarDir, pwd, mergedEnv);
-        // In a secure cluster the classpath jar must be localized to grant access
-        Path localizedClassPathJar = exec.localizeClasspathJar(
-            new Path(jarCp[0]), pwd, container.getUser());
-        String replacementClassPath = localizedClassPathJar.toString() + jarCp[1];
-        environment.put(Environment.CLASSPATH.name(), replacementClassPath);
-      }
+      sanitizeWindowsEnv(environment, pwd,
+          resources, nmPrivateClasspathJarDir);
     }
     // put AuxiliaryService data to environment
     for (Map.Entry<String, ByteBuffer> meta : containerManager
@@ -1217,6 +1152,103 @@ public class ContainerLaunch implements Callable<Integer> {
     }
   }
 
+  private void sanitizeWindowsEnv(Map<String, String> environment, Path pwd,
+      Map<Path, List<String>> resources, Path nmPrivateClasspathJarDir)
+      throws IOException {
+
+    String inputClassPath = environment.get(Environment.CLASSPATH.name());
+
+    if (inputClassPath != null && !inputClassPath.isEmpty()) {
+
+      //On non-windows, localized resources
+      //from distcache are available via the classpath as they were placed
+      //there but on windows they are not available when the classpath
+      //jar is created and so they "are lost" and have to be explicitly
+      //added to the classpath instead.  This also means that their position
+      //is lost relative to other non-distcache classpath entries which will
+      //break things like mapreduce.job.user.classpath.first.  An environment
+      //variable can be set to indicate that distcache entries should come
+      //first
+
+      boolean preferLocalizedJars = Boolean.parseBoolean(
+              environment.get(Environment.CLASSPATH_PREPEND_DISTCACHE.name())
+      );
+
+      boolean needsSeparator = false;
+      StringBuilder newClassPath = new StringBuilder();
+      if (!preferLocalizedJars) {
+        newClassPath.append(inputClassPath);
+        needsSeparator = true;
+      }
+
+      // Localized resources do not exist at the desired paths yet, because the
+      // container launch script has not run to create symlinks yet.  This
+      // means that FileUtil.createJarWithClassPath can't automatically expand
+      // wildcards to separate classpath entries for each file in the manifest.
+      // To resolve this, append classpath entries explicitly for each
+      // resource.
+      for (Map.Entry<Path, List<String>> entry : resources.entrySet()) {
+        boolean targetIsDirectory = new File(entry.getKey().toUri().getPath())
+                .isDirectory();
+
+        for (String linkName : entry.getValue()) {
+          // Append resource.
+          if (needsSeparator) {
+            newClassPath.append(File.pathSeparator);
+          } else {
+            needsSeparator = true;
+          }
+          newClassPath.append(pwd.toString())
+                  .append(Path.SEPARATOR).append(linkName);
+
+          // FileUtil.createJarWithClassPath must use File.toURI to convert
+          // each file to a URI to write into the manifest's classpath.  For
+          // directories, the classpath must have a trailing '/', but
+          // File.toURI only appends the trailing '/' if it is a directory that
+          // already exists.  To resolve this, add the classpath entries with
+          // explicit trailing '/' here for any localized resource that targets
+          // a directory.  Then, FileUtil.createJarWithClassPath will guarantee
+          // that the resulting entry in the manifest's classpath will have a
+          // trailing '/', and thus refer to a directory instead of a file.
+          if (targetIsDirectory) {
+            newClassPath.append(Path.SEPARATOR);
+          }
+        }
+      }
+      if (preferLocalizedJars) {
+        if (needsSeparator) {
+          newClassPath.append(File.pathSeparator);
+        }
+        newClassPath.append(inputClassPath);
+      }
+
+      // When the container launches, it takes the parent process's environment
+      // and then adds/overwrites with the entries from the container launch
+      // context.  Do the same thing here for correct substitution of
+      // environment variables in the classpath jar manifest.
+      Map<String, String> mergedEnv = new HashMap<String, String>(
+              System.getenv());
+      mergedEnv.putAll(environment);
+
+      // this is hacky and temporary - it's to preserve the windows secure
+      // behavior but enable non-secure windows to properly build the class
+      // path for access to job.jar/lib/xyz and friends (see YARN-2803)
+      Path jarDir;
+      if (exec instanceof WindowsSecureContainerExecutor) {
+        jarDir = nmPrivateClasspathJarDir;
+      } else {
+        jarDir = pwd;
+      }
+      String[] jarCp = FileUtil.createJarWithClassPath(
+              newClassPath.toString(), jarDir, pwd, mergedEnv);
+      // In a secure cluster the classpath jar must be localized to grant access
+      Path localizedClassPathJar = exec.localizeClasspathJar(
+              new Path(jarCp[0]), pwd, container.getUser());
+      String replacementClassPath = localizedClassPathJar.toString() + jarCp[1];
+      environment.put(Environment.CLASSPATH.name(), replacementClassPath);
+    }
+  }
+
   public static String getExitCodeFile(String pidFile) {
     return pidFile + EXIT_CODE_FILE_SUFFIX;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74a61438/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
index b70a4e1..ed81331 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
@@ -424,10 +424,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
     //List<String> -> stored as List -> fetched/converted to List<String>
     //we can't do better here thanks to type-erasure
     @SuppressWarnings("unchecked")
-    List<String> localDirs = ctx.getExecutionAttribute(LOCAL_DIRS);
-    @SuppressWarnings("unchecked")
-    List<String> logDirs = ctx.getExecutionAttribute(LOG_DIRS);
-    @SuppressWarnings("unchecked")
     List<String> filecacheDirs = ctx.getExecutionAttribute(FILECACHE_DIRS);
     @SuppressWarnings("unchecked")
     List<String> containerLocalDirs = ctx.getExecutionAttribute(
@@ -489,9 +485,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
 
     addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand);
 
-    Path nmPrivateContainerScriptPath = ctx.getExecutionAttribute(
-        NM_PRIVATE_CONTAINER_SCRIPT_PATH);
-
     String disableOverride = environment.get(
         ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE);
 
@@ -511,33 +504,8 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
 
     String commandFile = dockerClient.writeCommandToTempFile(runCommand,
         containerIdStr);
-    PrivilegedOperation launchOp = new PrivilegedOperation(
-        PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER);
-
-    launchOp.appendArgs(runAsUser, ctx.getExecutionAttribute(USER),
-        Integer.toString(PrivilegedOperation
-            .RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()),
-        ctx.getExecutionAttribute(APPID),
-        containerIdStr, containerWorkDir.toString(),
-        nmPrivateContainerScriptPath.toUri().getPath(),
-        ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(),
-        ctx.getExecutionAttribute(PID_FILE_PATH).toString(),
-        StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
-            localDirs),
-        StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
-            logDirs),
-        commandFile,
-        resourcesOpts);
-
-    String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE);
-
-    if (tcCommandFile != null) {
-      launchOp.appendArgs(tcCommandFile);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Launching container with cmd: " + runCommand
-          .getCommandWithArguments());
-    }
+    PrivilegedOperation launchOp = buildLaunchOp(ctx,
+        commandFile, runCommand);
 
     try {
       privilegedOperationExecutor.executePrivilegedOperation(null,
@@ -635,4 +603,53 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
     }
     return null;
   }
+
+
+
+  private PrivilegedOperation buildLaunchOp(ContainerRuntimeContext ctx,
+      String commandFile, DockerRunCommand runCommand) {
+
+    String runAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
+    String containerIdStr = ctx.getContainer().getContainerId().toString();
+    Path nmPrivateContainerScriptPath = ctx.getExecutionAttribute(
+            NM_PRIVATE_CONTAINER_SCRIPT_PATH);
+    Path containerWorkDir = ctx.getExecutionAttribute(CONTAINER_WORK_DIR);
+    //we can't do better here thanks to type-erasure
+    @SuppressWarnings("unchecked")
+    List<String> localDirs = ctx.getExecutionAttribute(LOCAL_DIRS);
+    @SuppressWarnings("unchecked")
+    List<String> logDirs = ctx.getExecutionAttribute(LOG_DIRS);
+    String resourcesOpts = ctx.getExecutionAttribute(RESOURCES_OPTIONS);
+
+    PrivilegedOperation launchOp = new PrivilegedOperation(
+            PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER);
+
+    launchOp.appendArgs(runAsUser, ctx.getExecutionAttribute(USER),
+            Integer.toString(PrivilegedOperation
+                    .RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()),
+            ctx.getExecutionAttribute(APPID),
+            containerIdStr,
+            containerWorkDir.toString(),
+            nmPrivateContainerScriptPath.toUri().getPath(),
+            ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(),
+            ctx.getExecutionAttribute(PID_FILE_PATH).toString(),
+            StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+                    localDirs),
+            StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
+                    logDirs),
+            commandFile,
+            resourcesOpts);
+
+    String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE);
+
+    if (tcCommandFile != null) {
+      launchOp.appendArgs(tcCommandFile);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Launching container with cmd: " + runCommand
+              .getCommandWithArguments());
+    }
+
+    return launchOp;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org