You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2015/12/11 20:19:29 UTC

[2/2] hadoop git commit: YARN-4356. Ensure the timeline service v.2 is disabled cleanly and has no impact when it's turned off. Contributed by Sangjin Lee.

YARN-4356. Ensure the timeline service v.2 is disabled cleanly and has no
impact when it's turned off. Contributed by Sangjin Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/45510fc6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/45510fc6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/45510fc6

Branch: refs/heads/feature-YARN-2928
Commit: 45510fc6bdcfe48aa8aa192f511ebfdada370c78
Parents: 0801563
Author: Li Lu <gt...@apache.org>
Authored: Fri Dec 11 11:17:34 2015 -0800
Committer: Li Lu <gt...@apache.org>
Committed: Fri Dec 11 11:17:34 2015 -0800

----------------------------------------------------------------------
 .../jobhistory/JobHistoryEventHandler.java      |  64 ++++---
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  11 +-
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   5 -
 .../src/main/resources/mapred-default.xml       |   7 -
 .../mapred/TestMRTimelineEventHandling.java     |   5 +-
 .../hadoop/mapreduce/v2/MiniMRYarnCluster.java  |   2 +-
 .../hadoop/yarn/conf/YarnConfiguration.java     |  58 +++++-
 .../distributedshell/ApplicationMaster.java     | 191 ++++++++-----------
 .../applications/distributedshell/Client.java   |  16 --
 .../distributedshell/TestDistributedShell.java  |  14 +-
 .../impl/pb/AllocateResponsePBImpl.java         |   4 +-
 .../hadoop/yarn/client/api/TimelineClient.java  |  18 +-
 .../client/api/impl/TimelineClientImpl.java     |  11 +-
 .../src/main/resources/yarn-default.xml         |   5 +-
 .../impl/pb/NodeHeartbeatRequestPBImpl.java     |  10 +-
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  10 +-
 .../hadoop/yarn/server/nodemanager/Context.java |   3 +-
 .../yarn/server/nodemanager/NodeManager.java    |  23 ++-
 .../nodemanager/NodeStatusUpdaterImpl.java      |  48 +++--
 .../collectormanager/NMCollectorService.java    |  10 +-
 .../containermanager/ContainerManagerImpl.java  |  59 ++++--
 .../application/ApplicationImpl.java            |  70 +++++--
 .../monitor/ContainersMonitorImpl.java          |  11 +-
 .../timelineservice/NMTimelinePublisher.java    |  49 ++---
 .../TestContainerManagerRecovery.java           |  11 +-
 .../application/TestApplication.java            |   3 +-
 .../nodemanager/webapp/TestNMWebServices.java   |   9 +-
 .../ApplicationMasterService.java               |  11 +-
 .../server/resourcemanager/ClientRMService.java |  35 ++--
 .../server/resourcemanager/RMAppManager.java    |   7 +-
 .../server/resourcemanager/ResourceManager.java |  33 ++--
 .../resourcemanager/ResourceTrackerService.java |  21 +-
 .../resourcemanager/amlauncher/AMLauncher.java  |  15 +-
 .../metrics/TimelineServiceV2Publisher.java     |   2 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java |  16 +-
 .../resourcemanager/TestClientRMService.java    |   3 +
 .../metrics/TestSystemMetricsPublisher.java     |   2 +-
 .../TestSystemMetricsPublisherForV2.java        |   1 +
 .../TestTimelineServiceClientIntegration.java   |  30 ++-
 .../PerNodeTimelineCollectorsAuxService.java    |  15 +-
 .../reader/TimelineReaderServer.java            |  14 +-
 ...TestPerNodeTimelineCollectorsAuxService.java |   9 +-
 .../reader/TestTimelineReaderServer.java        |   3 +
 .../reader/TestTimelineReaderWebServices.java   |   2 +
 ...stTimelineReaderWebServicesHBaseStorage.java |   2 +
 45 files changed, 540 insertions(+), 408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index f52e654..19699fb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -19,9 +19,6 @@
 package org.apache.hadoop.mapreduce.jobhistory;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -30,7 +27,11 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -46,7 +47,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -56,9 +56,9 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
-import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@@ -74,9 +74,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * The job history events get routed to this class. This class writes the Job
  * history events to the DFS directly into a staging dir and then moved to a
@@ -122,20 +121,17 @@ public class JobHistoryEventHandler extends AbstractService
 
   protected static final Map<JobId, MetaInfo> fileMap =
     Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
-  
-  // For posting entities in new timeline service in a non-blocking way
-  // TODO YARN-3367 replace with event loop in TimelineClient.
-  private static ExecutorService threadPool =
-      Executors.newCachedThreadPool(
-          new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
-          .build());
 
   // should job completion be force when the AM shuts down?
   protected volatile boolean forceJobCompletion = false;
 
   protected TimelineClient timelineClient;
   
-  private boolean newTimelineServiceEnabled = false;
+  private boolean timelineServiceV2Enabled = false;
+
+  // For posting entities in new timeline service in a non-blocking way
+  // TODO YARN-3367 replace with event loop in TimelineClient.
+  private ExecutorService threadPool;
 
   private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
   private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
@@ -265,22 +261,26 @@ public class JobHistoryEventHandler extends AbstractService
     // configuration status: off, on_with_v1 or on_with_v2.
     if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
         MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
-      if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-        
+      LOG.info("Emitting job history data to the timeline service is enabled");
+      if (YarnConfiguration.timelineServiceEnabled(conf)) {
+
         timelineClient = 
             ((MRAppMaster.RunningAppContext)context).getTimelineClient();
         timelineClient.init(conf);
-        newTimelineServiceEnabled = conf.getBoolean(
-            MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
-            MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
-        LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1"));
-        LOG.info("Emitting job history data to the timeline server is enabled");
+        timelineServiceV2Enabled =
+            YarnConfiguration.timelineServiceV2Enabled(conf);
+        LOG.info("Timeline service is enabled; version: " +
+            YarnConfiguration.getTimelineServiceVersion(conf));
+        if (timelineServiceV2Enabled) {
+          // initialize the thread pool for v.2 timeline service
+          threadPool = createThreadPool();
+        }
       } else {
         LOG.info("Timeline service is not enabled");
       }
     } else {
-      LOG.info("Emitting job history data to the timeline server is not enabled");
+      LOG.info("Emitting job history data to the timeline server is not " +
+          "enabled");
     }
 
     // Flag for setting
@@ -448,19 +448,27 @@ public class JobHistoryEventHandler extends AbstractService
     if (timelineClient != null) {
       timelineClient.stop();
     }
-    shutdownAndAwaitTermination();
+    if (threadPool != null) {
+      shutdownAndAwaitTermination();
+    }
     LOG.info("Stopped JobHistoryEventHandler. super.stop()");
     super.serviceStop();
   }
   
   // TODO remove threadPool after adding non-blocking call in TimelineClient
-  private static void shutdownAndAwaitTermination() {
+  private ExecutorService createThreadPool() {
+    return Executors.newCachedThreadPool(
+      new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+      .build());
+  }
+
+  private void shutdownAndAwaitTermination() {
     threadPool.shutdown();
     try {
       if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
         threadPool.shutdownNow(); 
         if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
-            LOG.error("ThreadPool did not terminate");
+          LOG.error("ThreadPool did not terminate");
       }
     } catch (InterruptedException ie) {
       threadPool.shutdownNow();
@@ -622,7 +630,7 @@ public class JobHistoryEventHandler extends AbstractService
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
         if (timelineClient != null) {
-          if (newTimelineServiceEnabled) {
+          if (timelineServiceV2Enabled) {
             processEventForNewTimelineService(historyEvent, event.getJobID(),
                 event.getTimestamp());
           } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index dafb6e9..a7c9245 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -1017,14 +1017,9 @@ public class MRAppMaster extends CompositeService {
       this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
       if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
               MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)
-            && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-                YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-
-        boolean newTimelineServiceEnabled = conf.getBoolean(
-            MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
-            MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
-            
-        if (newTimelineServiceEnabled) {
+            && YarnConfiguration.timelineServiceEnabled(conf)) {
+
+        if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
           // create new version TimelineClient
           timelineClient = TimelineClient.createTimelineClient(
               appAttemptID.getApplicationId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 3ab6eeb..3d1e841 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -467,11 +467,6 @@ public interface MRJobConfig {
     "mapreduce.job.emit-timeline-data";
   public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
       false;
-  
-  public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
-      "mapreduce.job.new-timeline-service.enabled";
-  public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
-      false;
 
   public static final String MR_PREFIX = "yarn.app.mapreduce.";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 0585234..6ece048 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -618,13 +618,6 @@
     </description>
 </property>
 
- <property>
-    <name>mapreduce.job.new-timeline-service.enabled</name>
-    <value>false</value>
-    <description>Specifies if posting job and task events to new timeline service.
-    </description>
-</property>
-
 <property>
   <name>mapreduce.input.fileinputformat.split.minsize</name>
   <value>0</value>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index b3ea26e..7b322e5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -161,11 +161,10 @@ public class TestMRTimelineEventHandling {
     LOG.info("testMRNewTimelineServiceEventHandling start.");
     Configuration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    // enable new timeline service
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
     conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
 
-    // enable new timeline serivce in MR side
-    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, true);
-
     // enable aux-service based timeline collectors
     conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
     conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
index 18a4c14..edb825d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
@@ -173,7 +173,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
     boolean enableTimelineAuxService = false;
     if (nmAuxServices != null) {
       for (String nmAuxService: nmAuxServices) {
-        if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) {
+        if (nmAuxService.equals(TIMELINE_AUX_SERVICE_NAME)) {
           enableTimelineAuxService = true;
           break;
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0165593..6a3854a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -83,6 +83,10 @@ public class YarnConfiguration extends Configuration {
         new DeprecationDelta("yarn.client.max-nodemanagers-proxies",
             NM_CLIENT_MAX_NM_PROXIES)
     });
+    Configuration.addDeprecations(new DeprecationDelta[] {
+        new DeprecationDelta(RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+            SYSTEM_METRICS_PUBLISHER_ENABLED)
+    });
   }
 
   //Configurations
@@ -383,7 +387,8 @@ public class YarnConfiguration extends Configuration {
 
   /**
    *  The setting that controls whether yarn system metrics is published on the
-   *  timeline server or not by RM. This configuration setting is for ATS V1
+   *  timeline server or not by RM. This configuration setting is for ATS V1.
+   *  This is now deprecated in favor of SYSTEM_METRICS_PUBLISHER_ENABLED.
    */
   public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX
       + "system-metrics-publisher.enabled";
@@ -2344,13 +2349,52 @@ public class YarnConfiguration extends Configuration {
     }
     return clusterId;
   }
-  
-  public static boolean systemMetricsPublisherEnabled(Configuration conf) {
+
+  // helper methods for timeline service configuration
+  /**
+   * Returns whether the timeline service is enabled via configuration.
+   *
+   * @param conf the configuration
+   * @return whether the timeline service is enabled.
+   */
+  public static boolean timelineServiceEnabled(Configuration conf) {
     return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
-        && conf.getBoolean(
-            YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
-            YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);  
+      YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+  }
+
+  /**
+   * Returns the timeline service version. It does not check whether the
+   * timeline service itself is enabled.
+   *
+   * @param conf the configuration
+   * @return the timeline service version as a float.
+   */
+  public static float getTimelineServiceVersion(Configuration conf) {
+    return conf.getFloat(TIMELINE_SERVICE_VERSION,
+        DEFAULT_TIMELINE_SERVICE_VERSION);
+  }
+
+  /**
+   * Returns whether the timeline service v.2 is enabled via configuration.
+   *
+   * @param conf the configuration
+   * @return whether the timeline service v.2 is enabled. V.2 refers to a
+   * version greater than equal to 2 but smaller than 3.
+   */
+  public static boolean timelineServiceV2Enabled(Configuration conf) {
+    return timelineServiceEnabled(conf) &&
+        (int)getTimelineServiceVersion(conf) == 2;
+  }
+
+  /**
+   * Returns whether the system publisher is enabled.
+   *
+   * @param conf the configuration
+   * @return whether the system publisher is enabled.
+   */
+  public static boolean systemMetricsPublisherEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+        YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
   }
 
   /* For debugging. mp configurations to system output as XML format. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index e27c947..380ba29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -217,14 +217,11 @@ public class ApplicationMaster {
   // Tracking url to which app master publishes info for clients to monitor
   private String appMasterTrackingUrl = "";
 
-  private boolean newTimelineService = false;
+  private boolean timelineServiceV2 = false;
 
   // For posting entities in new timeline service in a non-blocking way
   // TODO replace with event loop in TimelineClient.
-  private static ExecutorService threadPool =
-      Executors.newCachedThreadPool(
-          new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
-          .build());
+  private ExecutorService threadPool;
 
   // App Master configuration
   // No. of containers to run shell command on
@@ -314,8 +311,10 @@ public class ApplicationMaster {
       }
       appMaster.run();
       result = appMaster.finish();
-      
-      shutdownAndAwaitTermination();
+
+      if (appMaster.threadPool != null) {
+        appMaster.shutdownAndAwaitTermination();
+      }
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
       LogManager.shutdown();
@@ -329,16 +328,22 @@ public class ApplicationMaster {
       System.exit(2);
     }
   }
-  
+
   //TODO remove threadPool after adding non-blocking call in TimelineClient
-  private static void shutdownAndAwaitTermination() {
+  private ExecutorService createThreadPool() {
+    return Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+        .build());
+  }
+
+  private void shutdownAndAwaitTermination() {
     threadPool.shutdown();
     try {
       // Wait a while for existing tasks to terminate
       if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
         threadPool.shutdownNow();
         if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
-            LOG.error("ThreadPool did not terminate");
+          LOG.error("ThreadPool did not terminate");
       }
     } catch (InterruptedException ie) {
       threadPool.shutdownNow();
@@ -404,8 +409,7 @@ public class ApplicationMaster {
         "No. of containers on which the shell command needs to be executed");
     opts.addOption("priority", true, "Application Priority. Default 0");
     opts.addOption("debug", false, "Dump out debug information");
-    opts.addOption("timeline_service_version", true,
-        "Version for timeline service");
+
     opts.addOption("help", false, "Print usage");
     CommandLine cliParser = new GnuParser().parse(opts, args);
 
@@ -542,27 +546,15 @@ public class ApplicationMaster {
     requestPriority = Integer.parseInt(cliParser
         .getOptionValue("priority", "0"));
 
-    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-      YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-      if (cliParser.hasOption("timeline_service_version")) {
-        String timelineServiceVersion =
-            cliParser.getOptionValue("timeline_service_version", "v1");
-        if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) {
-          newTimelineService = false;
-        } else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) {
-          newTimelineService = true;
-        } else {
-          throw new IllegalArgumentException(
-              "timeline_service_version is not set properly, should be 'v1' or 'v2'");
-        }
+    if (YarnConfiguration.timelineServiceEnabled(conf)) {
+      timelineServiceV2 =
+          YarnConfiguration.timelineServiceV2Enabled(conf);
+      if (timelineServiceV2) {
+        threadPool = createThreadPool();
       }
     } else {
       timelineClient = null;
       LOG.warn("Timeline service is not enabled");
-      if (cliParser.hasOption("timeline_service_version")) {
-        throw new IllegalArgumentException(
-            "Timeline service is not enabled");
-      }
     }
 
     return true;
@@ -623,16 +615,16 @@ public class ApplicationMaster {
     nmClientAsync.start();
 
     startTimelineClient(conf);
-    // need to bind timelineClient
-    amRMClient.registerTimelineClient(timelineClient);
+    if (timelineServiceV2) {
+      // need to bind timelineClient
+      amRMClient.registerTimelineClient(timelineClient);
+    }
     if(timelineClient != null) {
-      if (newTimelineService) {
-        publishApplicationAttemptEventOnNewTimelineService(timelineClient, 
-            appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, 
-            appSubmitterUgi);
+      if (timelineServiceV2) {
+        publishApplicationAttemptEventOnTimelineServiceV2(
+            DSEvent.DS_APP_ATTEMPT_START);
       } else {
-        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-            DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
+        publishApplicationAttemptEvent(DSEvent.DS_APP_ATTEMPT_START);
       }
     }
 
@@ -703,10 +695,9 @@ public class ApplicationMaster {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
         @Override
         public Void run() throws Exception {
-          if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+          if (YarnConfiguration.timelineServiceEnabled(conf)) {
             // Creating the Timeline Client
-            if (newTimelineService) {
+            if (timelineServiceV2) {
               timelineClient = TimelineClient.createTimelineClient(
                   appAttemptID.getApplicationId());
             } else {
@@ -742,13 +733,11 @@ public class ApplicationMaster {
     }
 
     if (timelineClient != null) {
-      if (newTimelineService) {
-        publishApplicationAttemptEventOnNewTimelineService(timelineClient,
-          appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId,
-          appSubmitterUgi);
+      if (timelineServiceV2) {
+        publishApplicationAttemptEventOnTimelineServiceV2(
+            DSEvent.DS_APP_ATTEMPT_END);
       } else {
-        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
+        publishApplicationAttemptEvent(DSEvent.DS_APP_ATTEMPT_END);
       }
     }
 
@@ -855,12 +844,10 @@ public class ApplicationMaster {
               + containerStatus.getContainerId());
         }
         if(timelineClient != null) {
-          if (newTimelineService) {
-            publishContainerEndEventOnNewTimelineService(
-                timelineClient, containerStatus, domainId, appSubmitterUgi);
+          if (timelineServiceV2) {
+            publishContainerEndEventOnTimelineServiceV2(containerStatus);
           } else {
-            publishContainerEndEvent(
-                timelineClient, containerStatus, domainId, appSubmitterUgi);
+            publishContainerEndEvent(containerStatus);
           }
         }
       }
@@ -981,14 +968,11 @@ public class ApplicationMaster {
         applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
       }
       if(applicationMaster.timelineClient != null) {
-        if (applicationMaster.newTimelineService) {
-            ApplicationMaster.publishContainerStartEventOnNewTimelineService(
-                applicationMaster.timelineClient, container,
-                applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+        if (applicationMaster.timelineServiceV2) {
+            applicationMaster.publishContainerStartEventOnTimelineServiceV2(
+                container);
         } else {
-          ApplicationMaster.publishContainerStartEvent(
-              applicationMaster.timelineClient, container,
-              applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+          applicationMaster.publishContainerStartEvent(container);
         }
       }
     }
@@ -1195,14 +1179,12 @@ public class ApplicationMaster {
     }
   }
   
-  private static void publishContainerStartEvent(
-      final TimelineClient timelineClient, Container container, String domainId,
-      UserGroupInformation ugi) {
+  private void publishContainerStartEvent(Container container) {
     final TimelineEntity entity = new TimelineEntity();
     entity.setEntityId(container.getId().toString());
     entity.setEntityType(DSEntity.DS_CONTAINER.toString());
     entity.setDomainId(domainId);
-    entity.addPrimaryFilter("user", ugi.getShortUserName());
+    entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
     TimelineEvent event = new TimelineEvent();
     event.setTimestamp(System.currentTimeMillis());
     event.setEventType(DSEvent.DS_CONTAINER_START.toString());
@@ -1211,12 +1193,13 @@ public class ApplicationMaster {
     entity.addEvent(event);
 
     try {
-      ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
-        @Override
-        public TimelinePutResponse run() throws Exception {
-          return timelineClient.putEntities(entity);
-        }
-      });
+      appSubmitterUgi.doAs(
+          new PrivilegedExceptionAction<TimelinePutResponse>() {
+            @Override
+            public TimelinePutResponse run() throws Exception {
+              return timelineClient.putEntities(entity);
+            }
+        });
     } catch (Exception e) {
       LOG.error("Container start event could not be published for "
           + container.getId().toString(),
@@ -1224,14 +1207,12 @@ public class ApplicationMaster {
     }
   }
 
-  private static void publishContainerEndEvent(
-      final TimelineClient timelineClient, ContainerStatus container,
-      String domainId, UserGroupInformation ugi) {
+  private void publishContainerEndEvent(ContainerStatus container) {
     final TimelineEntity entity = new TimelineEntity();
     entity.setEntityId(container.getContainerId().toString());
     entity.setEntityType(DSEntity.DS_CONTAINER.toString());
     entity.setDomainId(domainId);
-    entity.addPrimaryFilter("user", ugi.getShortUserName());
+    entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
     TimelineEvent event = new TimelineEvent();
     event.setTimestamp(System.currentTimeMillis());
     event.setEventType(DSEvent.DS_CONTAINER_END.toString());
@@ -1246,14 +1227,12 @@ public class ApplicationMaster {
     }
   }
 
-  private static void publishApplicationAttemptEvent(
-      final TimelineClient timelineClient, String appAttemptId,
-      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
+  private void publishApplicationAttemptEvent(DSEvent appEvent) {
     final TimelineEntity entity = new TimelineEntity();
-    entity.setEntityId(appAttemptId);
+    entity.setEntityId(appAttemptID.toString());
     entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
     entity.setDomainId(domainId);
-    entity.addPrimaryFilter("user", ugi.getShortUserName());
+    entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
     TimelineEvent event = new TimelineEvent();
     event.setEventType(appEvent.toString());
     event.setTimestamp(System.currentTimeMillis());
@@ -1264,7 +1243,7 @@ public class ApplicationMaster {
       LOG.error("App Attempt "
           + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
           + " event could not be published for "
-          + appAttemptId.toString(), e);
+          + appAttemptID, e);
     }
   }
 
@@ -1296,27 +1275,24 @@ public class ApplicationMaster {
     return new Thread(runnableLaunchContainer);
   }
   
-  private static void publishContainerStartEventOnNewTimelineService(
-      final TimelineClient timelineClient, final Container container,
-      final String domainId, final UserGroupInformation ugi) {
+  private void publishContainerStartEventOnTimelineServiceV2(
+      final Container container) {
     Runnable publishWrapper = new Runnable() {
       public void run() {
-        publishContainerStartEventOnNewTimelineServiceBase(timelineClient,
-            container, domainId, ugi);
+        publishContainerStartEventOnTimelineServiceV2Base(container);
       }
     };
     threadPool.execute(publishWrapper);
   }
 
-  private static void publishContainerStartEventOnNewTimelineServiceBase(
-      final TimelineClient timelineClient, Container container, String domainId,
-      UserGroupInformation ugi) {
+  private void publishContainerStartEventOnTimelineServiceV2Base(
+      Container container) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
     entity.setId(container.getId().toString());
     entity.setType(DSEntity.DS_CONTAINER.toString());
     //entity.setDomainId(domainId);
-    entity.addInfo("user", ugi.getShortUserName());
+    entity.addInfo("user", appSubmitterUgi.getShortUserName());
 
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
@@ -1327,7 +1303,7 @@ public class ApplicationMaster {
     entity.addEvent(event);
 
     try {
-      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
           timelineClient.putEntities(entity);
@@ -1341,27 +1317,24 @@ public class ApplicationMaster {
     }
   }
 
-  private static void publishContainerEndEventOnNewTimelineService(
-      final TimelineClient timelineClient, final ContainerStatus container,
-      final String domainId, final UserGroupInformation ugi) {
+  private void publishContainerEndEventOnTimelineServiceV2(
+      final ContainerStatus container) {
     Runnable publishWrapper = new Runnable() {
       public void run() {
-          publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
-              container, domainId, ugi);
+          publishContainerEndEventOnTimelineServiceV2Base(container);
       }
     };
     threadPool.execute(publishWrapper);
   }
 
-  private static void publishContainerEndEventOnNewTimelineServiceBase(
-      final TimelineClient timelineClient, final ContainerStatus container,
-      final String domainId, final UserGroupInformation ugi) {
+  private void publishContainerEndEventOnTimelineServiceV2Base(
+      final ContainerStatus container) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
     entity.setId(container.getContainerId().toString());
     entity.setType(DSEntity.DS_CONTAINER.toString());
     //entity.setDomainId(domainId);
-    entity.addInfo("user", ugi.getShortUserName());
+    entity.addInfo("user", appSubmitterUgi.getShortUserName());
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
         new  org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
     event.setTimestamp(System.currentTimeMillis());
@@ -1371,7 +1344,7 @@ public class ApplicationMaster {
     entity.addEvent(event);
 
     try {
-      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
           timelineClient.putEntities(entity);
@@ -1385,29 +1358,25 @@ public class ApplicationMaster {
     }
   }
 
-  private static void publishApplicationAttemptEventOnNewTimelineService(
-      final TimelineClient timelineClient, final String appAttemptId,
-      final DSEvent appEvent, final String domainId,
-      final UserGroupInformation ugi) {
+  private void publishApplicationAttemptEventOnTimelineServiceV2(
+      final DSEvent appEvent) {
 
     Runnable publishWrapper = new Runnable() {
       public void run() {
-        publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient,
-            appAttemptId, appEvent, domainId, ugi);
+        publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent);
       }
     };
     threadPool.execute(publishWrapper);
   }
 
-  private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
-      final TimelineClient timelineClient, String appAttemptId,
-      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
+  private void publishApplicationAttemptEventOnTimelineServiceV2Base(
+      DSEvent appEvent) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
-    entity.setId(appAttemptId);
+    entity.setId(appAttemptID.toString());
     entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
     //entity.setDomainId(domainId);
-    entity.addInfo("user", ugi.getShortUserName());
+    entity.addInfo("user", appSubmitterUgi.getShortUserName());
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
     event.setId(appEvent.toString());
@@ -1415,7 +1384,7 @@ public class ApplicationMaster {
     entity.addEvent(event);
 
     try {
-      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
           timelineClient.putEntities(entity);
@@ -1426,7 +1395,7 @@ public class ApplicationMaster {
       LOG.error("App Attempt "
           + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
           + " event could not be published for "
-          + appAttemptId.toString(),
+          + appAttemptID,
           e instanceof UndeclaredThrowableException ? e.getCause() : e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 2819c91..e66005e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -192,8 +192,6 @@ public class Client {
   // Command line options
   private Options opts;
 
-  private String timelineServiceVersion;
-
   private static final String shellCommandPath = "shellCommands";
   private static final String shellArgsPath = "shellArgs";
   private static final String appMasterJarPath = "AppMaster.jar";
@@ -269,7 +267,6 @@ public class Client {
     opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
     opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
     opts.addOption("log_properties", true, "log4j.properties file");
-    opts.addOption("timeline_service_version", true, "Version for timeline service");
     opts.addOption("keep_containers_across_application_attempts", false,
       "Flag to indicate whether to keep containers across application attempts." +
       " If the flag is true, running containers will not be killed when" +
@@ -371,16 +368,6 @@ public class Client {
           + " Specified virtual cores=" + amVCores);
     }
 
-    if (cliParser.hasOption("timeline_service_version")) {
-      timelineServiceVersion =
-        cliParser.getOptionValue("timeline_service_version", "v1");
-      if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") ||
-          timelineServiceVersion.trim().equalsIgnoreCase("v2"))) {
-        throw new IllegalArgumentException(
-              "timeline_service_version is not set properly, should be 'v1' or 'v2'");
-      }
-    }
-
     if (!cliParser.hasOption("jar")) {
       throw new IllegalArgumentException("No jar file specified for application master");
     }		
@@ -690,9 +677,6 @@ public class Client {
       vargs.add("--debug");
     }
 
-    if (timelineServiceVersion != null) {
-      vargs.add("--timeline_service_version " + timelineServiceVersion);
-    }
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index fe817c3..b3ff9b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -99,22 +99,19 @@ public class TestDistributedShell {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
     conf.set("yarn.log.dir", "target");
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
 
     if (!testName.getMethodName().toLowerCase().contains("v2")) {
       // disable aux-service based timeline collectors
       conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
-      conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-          true);
-      conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, false);
     } else {
+      // set version to 2
+      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
       // enable aux-service based timeline collectors
       conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
       conf.set(YarnConfiguration.NM_AUX_SERVICES + "."
           + TIMELINE_AUX_SERVICE_NAME + ".class",
           PerNodeTimelineCollectorsAuxService.class.getName());
-      conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-          false);
     }
     conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
     conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
@@ -245,12 +242,7 @@ public class TestDistributedShell {
     }
     boolean isTestingTimelineV2 = false;
     if (timelineVersion.equalsIgnoreCase("v2")) {
-      String[] timelineArgs = {
-          "--timeline_service_version",
-          "v2"
-      };
       isTestingTimelineV2 = true;
-      args = mergeArgs(args, timelineArgs);
       if (!defaultFlow) {
         String[] flowArgs = {
             "--flow_name",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index 7176146..d096a6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -403,7 +403,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
   }
 
   @Override
-  public Priority getApplicationPriority() {
+  public synchronized Priority getApplicationPriority() {
     AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
     if (this.appPriority != null) {
       return this.appPriority;
@@ -416,7 +416,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
   }
 
   @Override
-  public void setApplicationPriority(Priority priority) {
+  public synchronized void setApplicationPriority(Priority priority) {
     maybeInitBuilder();
     if (priority == null)
       builder.clearApplicationPriority();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index f2707ba..9772dc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -48,17 +48,21 @@ public abstract class TimelineClient extends AbstractService {
    * current user may use {@link UserGroupInformation#doAs} another user to
    * construct and initialize a timeline client if the following operations are
    * supposed to be conducted by that user.
-   *
-   * @return a timeline client
    */
   protected ApplicationId contextAppId;
 
+  /**
+   * Creates an instance of the timeline v.1.x client.
+   */
   @Public
   public static TimelineClient createTimelineClient() {
     TimelineClient client = new TimelineClientImpl();
     return client;
   }
 
+  /**
+   * Creates an instance of the timeline v.2 client.
+   */
   @Public
   public static TimelineClient createTimelineClient(ApplicationId appId) {
     TimelineClient client = new TimelineClientImpl(appId);
@@ -156,8 +160,9 @@ public abstract class TimelineClient extends AbstractService {
   /**
    * <p>
    * Send the information of a number of conceptual entities to the timeline
-   * aggregator. It is a blocking API. The method will not return until all the
-   * put entities have been persisted.
+   * service v.2 collector. It is a blocking API. The method will not return
+   * until all the put entities have been persisted. If this method is invoked
+   * for a non-v.2 timeline client instance, a YarnException is thrown.
    * </p>
    *
    * @param entities
@@ -173,8 +178,9 @@ public abstract class TimelineClient extends AbstractService {
   /**
    * <p>
    * Send the information of a number of conceptual entities to the timeline
-   * aggregator. It is an asynchronous API. The method will return once all the
-   * entities are received.
+   * service v.2 collector. It is an asynchronous API. The method will return
+   * once all the entities are received. If this method is invoked for a
+   * non-v.2 timeline client instance, a YarnException is thrown.
    * </p>
    *
    * @param entities

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 8312b6d..3a624ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -124,7 +124,7 @@ public class TimelineClientImpl extends TimelineClient {
   private int maxServiceRetries;
   private long serviceRetryInterval;
 
-  private boolean newTimelineService = false;
+  private boolean timelineServiceV2 = false;
 
   @Private
   @VisibleForTesting
@@ -270,7 +270,7 @@ public class TimelineClientImpl extends TimelineClient {
 
   public TimelineClientImpl(ApplicationId applicationId) {
     super(TimelineClientImpl.class.getName(), applicationId);
-    this.newTimelineService = true;
+    this.timelineServiceV2 = true;
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
@@ -299,13 +299,13 @@ public class TimelineClientImpl extends TimelineClient {
         new TimelineURLConnectionFactory()), cc);
     TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
     // TODO need to cleanup filter retry later.
-    if (!newTimelineService) {
+    if (!timelineServiceV2) {
       client.addFilter(retryFilter);
     }
 
     // old version timeline service need to get address from configuration
     // while new version need to auto discovery (with retry).
-    if (newTimelineService) {
+    if (timelineServiceV2) {
       maxServiceRetries = conf.getInt(
           YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
           YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
@@ -353,6 +353,9 @@ public class TimelineClientImpl extends TimelineClient {
   private void putEntities(boolean async,
       org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
       throws IOException, YarnException {
+    if (!timelineServiceV2) {
+      throw new YarnException("v.2 method is invoked on a v.1.x client");
+    }
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
         entitiesContainer =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 99f84e6..a9adbbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -771,14 +771,15 @@
   <property>
     <description>The setting that controls whether yarn system metrics is
     published to the Timeline server (version one) or not, by RM. 
-    This configuration is deprecated.</description>
+    This configuration is now deprecated in favor of
+    yarn.system-metrics-publisher.enabled.</description>
     <name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
     <value>false</value>
   </property>
 
   <property>
     <description>The setting that controls whether yarn system metrics is
-    published on the Timeline server (version two) or not by RM And NM.</description>
+    published on the Timeline service or not by RM And NM.</description>
     <name>yarn.system-metrics-publisher.enabled</name>
     <value>false</value>
   </property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index fa0cf5c..066abfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -261,10 +261,12 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private void initRegisteredCollectors() {
     NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
     List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList();
-    this.registeredCollectors = new HashMap<ApplicationId, String> ();
-    for (AppCollectorsMapProto c : list) {
-      ApplicationId appId = convertFromProtoFormat(c.getAppId());
-      this.registeredCollectors.put(appId, c.getAppCollectorAddr());
+    if (!list.isEmpty()) {
+      this.registeredCollectors = new HashMap<>();
+      for (AppCollectorsMapProto c : list) {
+        ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        this.registeredCollectors.put(appId, c.getAppCollectorAddr());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 2521b9c..151006b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -531,10 +531,12 @@ public class NodeHeartbeatResponsePBImpl extends
   private void initAppCollectorsMap() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     List<AppCollectorsMapProto> list = p.getAppCollectorsMapList();
-    this.appCollectorsMap = new HashMap<ApplicationId, String> ();
-    for (AppCollectorsMapProto c : list) {
-      ApplicationId appId = convertFromProtoFormat(c.getAppId());
-      this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
+    if (!list.isEmpty()) {
+      this.appCollectorsMap = new HashMap<>();
+      for (AppCollectorsMapProto c : list) {
+        ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 0b378a1..8fce422 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -64,7 +64,8 @@ public interface Context {
 
   /**
    * Get the registered collectors that located on this NM.
-   * @return registered
+   * @return registered collectors, or null if the timeline service v.2 is not
+   * enabled
    */
   Map<ApplicationId, String> getRegisteredCollectors();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 601bd04..da8a13a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -97,6 +97,7 @@ public class NodeManager extends CompositeService
   private Context context;
   private AsyncDispatcher dispatcher;
   private ContainerManagerImpl containerManager;
+  // the NM collector service is set only if the timeline service v.2 is enabled
   private NMCollectorService nmCollectorService;
   private NodeStatusUpdater nodeStatusUpdater;
   private NodeResourceMonitor nodeResourceMonitor;
@@ -356,8 +357,10 @@ public class NodeManager extends CompositeService
 
     DefaultMetricsSystem.initialize("NodeManager");
 
-    this.nmCollectorService = createNMCollectorService(context);
-    addService(nmCollectorService);
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      this.nmCollectorService = createNMCollectorService(context);
+      addService(nmCollectorService);
+    }
 
     // StatusUpdater should be added last so that it get started last 
     // so that we make sure everything is up before registering with RM. 
@@ -457,8 +460,7 @@ public class NodeManager extends CompositeService
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();
 
-    protected Map<ApplicationId, String> registeredCollectors =
-        new ConcurrentHashMap<ApplicationId, String>();
+    protected Map<ApplicationId, String> registeredCollectors;
 
     protected final ConcurrentMap<ContainerId,
         org.apache.hadoop.yarn.api.records.Container> increasedContainers =
@@ -484,6 +486,9 @@ public class NodeManager extends CompositeService
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
         NMStateStoreService stateStore, Configuration conf) {
+      if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+        this.registeredCollectors = new ConcurrentHashMap<>();
+      }
       this.containerTokenSecretManager = containerTokenSecretManager;
       this.nmTokenSecretManager = nmTokenSecretManager;
       this.dirsHandler = dirsHandler;
@@ -709,7 +714,14 @@ public class NodeManager extends CompositeService
     return this.context;
   }
 
-  // For testing
+  /**
+   * Returns the NM collector service. It should be used only for testing
+   * purposes.
+   *
+   * @return the NM collector service, or null if the timeline service v.2 is
+   * not enabled
+   */
+  @VisibleForTesting
   NMCollectorService getNMCollectorService() {
     return this.nmCollectorService;
   }
@@ -717,6 +729,7 @@ public class NodeManager extends CompositeService
   public static void main(String[] args) throws IOException {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
+    @SuppressWarnings("resource")
     NodeManager nodeManager = new NodeManager();
     Configuration conf = new YarnConfiguration();
     new GenericOptionsParser(conf, args);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index d39204f..39c846c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -821,7 +821,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
               dispatcher.getEventHandler().handle(
                   new CMgrSignalContainersEvent(containersToSignal));
             }
-            if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
+            if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
               updateTimelineClientsAddress(response);
             }
 
@@ -853,7 +853,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       /**
        * Caller should take care of sending non null nodelabels for both
        * arguments
-       * 
+       *
        * @param nodeLabelsNew
        * @param nodeLabelsOld
        * @return if the New node labels are diff from the older one.
@@ -869,27 +869,37 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
       private void updateTimelineClientsAddress(
           NodeHeartbeatResponse response) {
-        Set<Map.Entry<ApplicationId, String>> rmKnownCollectors = 
-            response.getAppCollectorsMap().entrySet();
-        for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
-          ApplicationId appId = entry.getKey();
-          String collectorAddr = entry.getValue();
-
-          // Only handle applications running on local node.
-          // Not include apps with timeline collectors running in local
-          Application application = context.getApplications().get(appId);
-          if (application != null &&
-              !context.getRegisteredCollectors().containsKey(appId)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Sync a new collector address: " + collectorAddr + 
-                  " for application: " + appId + " from RM.");
+        Map<ApplicationId, String> knownCollectorsMap =
+            response.getAppCollectorsMap();
+        if (knownCollectorsMap == null) {
+          LOG.warn("the collectors map is null");
+        } else {
+          Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
+              knownCollectorsMap.entrySet();
+          for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
+            ApplicationId appId = entry.getKey();
+            String collectorAddr = entry.getValue();
+
+            // Only handle applications running on local node.
+            // Not include apps with timeline collectors running in local
+            Application application = context.getApplications().get(appId);
+            // TODO this logic could be problematic if the collector address
+            // gets updated due to NM restart or collector service failure
+            if (application != null &&
+                !context.getRegisteredCollectors().containsKey(appId)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sync a new collector address: " + collectorAddr +
+                    " for application: " + appId + " from RM.");
+              }
+              TimelineClient client = application.getTimelineClient();
+              if (client != null) {
+                client.setTimelineServiceAddress(collectorAddr);
+              }
             }
-            TimelineClient client = application.getTimelineClient();
-            client.setTimelineServiceAddress(collectorAddr);
           }
         }
       }
-      
+
       private void updateMasterKeys(NodeHeartbeatResponse response) {
         // See if the master-key has rolled over
         MasterKey updatedMasterKey = response.getContainerTokenMasterKey();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
index db79ee5..3ba81ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -43,6 +43,10 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 
+/**
+ * Service that handles collector information. It is used only if the timeline
+ * service v.2 is enabled.
+ */
 public class NMCollectorService extends CompositeService implements
     CollectorNodemanagerProtocol {
 
@@ -113,9 +117,9 @@ public class NMCollectorService extends CompositeService implements
         String collectorAddr = collector.getCollectorAddr();
         newCollectorsMap.put(appId, collectorAddr);
         // set registered collector address to TimelineClient.
-        if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
-          TimelineClient client = 
-              context.getApplications().get(appId).getTimelineClient();
+        TimelineClient client =
+            context.getApplications().get(appId).getTimelineClient();
+        if (client != null) {
           client.setTimelineServiceAddress(collectorAddr);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index b010eee..e4668c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -115,6 +115,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl.FlowContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
@@ -190,7 +191,8 @@ public class ContainerManagerImpl extends CompositeService implements
 
   private long waitForContainersOnShutdownMillis;
 
-  private final NMTimelinePublisher nmMetricsPublisher;
+  // NM metrics publisher is set only if the timeline service v.2 is enabled
+  private NMTimelinePublisher nmMetricsPublisher;
 
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@@ -218,8 +220,15 @@ public class ContainerManagerImpl extends CompositeService implements
     auxiliaryServices.registerServiceListener(this);
     addService(auxiliaryServices);
 
-    nmMetricsPublisher = createNMTimelinePublisher(context);
-    context.setNMTimelinePublisher(nmMetricsPublisher);
+    // initialize the metrics publisher if the timeline service v.2 is enabled
+    // and the system publisher is enabled
+    Configuration conf = context.getConf();
+    if (YarnConfiguration.timelineServiceV2Enabled(conf) &&
+        YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
+      LOG.info("YARN system metrics publishing service is enabled");
+      nmMetricsPublisher = createNMTimelinePublisher(context);
+      context.setNMTimelinePublisher(nmMetricsPublisher);
+    }
     this.containersMonitor =
         new ContainersMonitorImpl(exec, dispatcher, this.context);
     addService(this.containersMonitor);
@@ -237,7 +246,6 @@ public class ContainerManagerImpl extends CompositeService implements
     
     addService(dispatcher);
 
-
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -335,7 +343,7 @@ public class ContainerManagerImpl extends CompositeService implements
     LOG.info("Recovering application " + appId);
     //TODO: Recover flow and flow run ID
     ApplicationImpl app = new ApplicationImpl(
-        dispatcher, p.getUser(), null, null, 0, appId, creds, context);
+        dispatcher, p.getUser(), appId, creds, context);
     context.getApplications().put(appId, app);
     app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }
@@ -941,20 +949,27 @@ public class ContainerManagerImpl extends CompositeService implements
     try {
       if (!serviceStopped) {
         // Create the application
-        String flowName = launchContext.getEnvironment().get(
-            TimelineUtils.FLOW_NAME_TAG_PREFIX);
-        String flowVersion = launchContext.getEnvironment().get(
-            TimelineUtils.FLOW_VERSION_TAG_PREFIX);
-        String flowRunIdStr = launchContext.getEnvironment().get(
-            TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
-        long flowRunId = 0L;
-        if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
-          flowRunId = Long.parseLong(flowRunIdStr);
+        // populate the flow context from the launch context if the timeline
+        // service v.2 is enabled
+        FlowContext flowContext = null;
+        if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+          String flowName = launchContext.getEnvironment().get(
+              TimelineUtils.FLOW_NAME_TAG_PREFIX);
+          String flowVersion = launchContext.getEnvironment().get(
+              TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+          String flowRunIdStr = launchContext.getEnvironment().get(
+              TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+          long flowRunId = 0L;
+          if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+            flowRunId = Long.parseLong(flowRunIdStr);
+          }
+          flowContext =
+              new FlowContext(flowName, flowVersion, flowRunId);
         }
         if (!context.getApplications().containsKey(applicationID)) {
           Application application =
-              new ApplicationImpl(dispatcher, user, flowName, flowVersion,
-                  flowRunId, applicationID, credentials, context);
+              new ApplicationImpl(dispatcher, user, flowContext,
+                  applicationID, credentials, context);
           if (context.getApplications().putIfAbsent(applicationID,
               application) == null) {
             LOG.info("Creating a new application reference for app "
@@ -1310,7 +1325,9 @@ public class ContainerManagerImpl extends CompositeService implements
       Container c = containers.get(event.getContainerID());
       if (c != null) {
         c.handle(event);
-        nmMetricsPublisher.publishContainerEvent(event);
+        if (nmMetricsPublisher != null) {
+          nmMetricsPublisher.publishContainerEvent(event);
+        }
       } else {
         LOG.warn("Event " + event + " sent to absent container " +
             event.getContainerID());
@@ -1326,7 +1343,9 @@ public class ContainerManagerImpl extends CompositeService implements
               event.getApplicationID());
       if (app != null) {
         app.handle(event);
-        nmMetricsPublisher.publishApplicationEvent(event);
+        if (nmMetricsPublisher != null) {
+          nmMetricsPublisher.publishApplicationEvent(event);
+        }
       } else {
         LOG.warn("Event " + event + " sent to absent application "
             + event.getApplicationID());
@@ -1349,7 +1368,9 @@ public class ContainerManagerImpl extends CompositeService implements
     @Override
     public void handle(LocalizationEvent event) {
       origLocalizationEventHandler.handle(event);
-      timelinePublisher.publishLocalizationEvent(event);
+      if (timelinePublisher != null) {
+        timelinePublisher.publishLocalizationEvent(event);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 6e87cfd..93c6758 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -67,9 +67,8 @@ public class ApplicationImpl implements Application {
 
   final Dispatcher dispatcher;
   final String user;
-  final String flowName;
-  final String flowVersion;
-  final long flowRunId;
+  // flow context is set only if the timeline service v.2 is enabled
+  private FlowContext flowContext;
   final ApplicationId appId;
   final Credentials credentials;
   Map<ApplicationAccessType, String> applicationACLs;
@@ -86,14 +85,16 @@ public class ApplicationImpl implements Application {
   Map<ContainerId, Container> containers =
       new HashMap<ContainerId, Container>();
 
-  public ApplicationImpl(Dispatcher dispatcher, String user, String flowName,
-      String flowVersion, long flowRunId, ApplicationId appId,
-      Credentials credentials, Context context) {
+  public ApplicationImpl(Dispatcher dispatcher, String user,
+      ApplicationId appId, Credentials credentials, Context context) {
+    this(dispatcher, user, null, appId, credentials, context);
+  }
+
+  public ApplicationImpl(Dispatcher dispatcher, String user,
+      FlowContext flowContext, ApplicationId appId, Credentials credentials,
+      Context context) {
     this.dispatcher = dispatcher;
     this.user = user;
-    this.flowName = flowName;
-    this.flowVersion = flowVersion;
-    this.flowRunId = flowRunId;
     this.appId = appId;
     this.credentials = credentials;
     this.aclsManager = context.getApplicationACLsManager();
@@ -103,11 +104,44 @@ public class ApplicationImpl implements Application {
     writeLock = lock.writeLock();
     stateMachine = stateMachineFactory.make(this);
     Configuration conf = context.getConf();
-    if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
-      createAndStartTimelineClient(conf);
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      if (flowContext == null) {
+        throw new IllegalArgumentException("flow context cannot be null");
+      }
+      this.flowContext = flowContext;
+      if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
+        createAndStartTimelineClient(conf);
+      }
     }
   }
-  
+
+  /**
+   * Data object that encapsulates the flow context for the application purpose.
+   */
+  public static class FlowContext {
+    private final String flowName;
+    private final String flowVersion;
+    private final long flowRunId;
+
+    public FlowContext(String flowName, String flowVersion, long flowRunId) {
+      this.flowName = flowName;
+      this.flowVersion = flowVersion;
+      this.flowRunId = flowRunId;
+    }
+
+    public String getFlowName() {
+      return flowName;
+    }
+
+    public String getFlowVersion() {
+      return flowVersion;
+    }
+
+    public long getFlowRunId() {
+      return flowRunId;
+    }
+  }
+
   private void createAndStartTimelineClient(Configuration conf) {
     // create and start timeline client
     this.timelineClient = TimelineClient.createTimelineClient(appId);
@@ -454,7 +488,11 @@ public class ApplicationImpl implements Application {
       // Remove collectors info for finished apps.
       // TODO check we remove related collectors info in failure cases
       // (YARN-3038)
-      app.context.getRegisteredCollectors().remove(app.getAppId());
+      Map<ApplicationId, String> registeredCollectors =
+          app.context.getRegisteredCollectors();
+      if (registeredCollectors != null) {
+        registeredCollectors.remove(app.getAppId());
+      }
       // stop timelineClient when application get finished.
       TimelineClient timelineClient = app.getTimelineClient();
       if (timelineClient != null) {
@@ -521,16 +559,16 @@ public class ApplicationImpl implements Application {
 
   @Override
   public String getFlowName() {
-    return flowName;
+    return flowContext == null ? null : flowContext.getFlowName();
   }
 
   @Override
   public String getFlowVersion() {
-    return flowVersion;
+    return flowContext == null ? null : flowContext.getFlowVersion();
   }
 
   @Override
   public long getFlowRunId() {
-    return flowRunId;
+    return flowContext == null ? 0L : flowContext.getFlowRunId();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index dfa32ac..589cf75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
@@ -559,9 +560,13 @@ public class ContainersMonitorImpl extends AbstractService implements
 
             ContainerImpl container =
                 (ContainerImpl) context.getContainers().get(containerId);
-            container.getNMTimelinePublisher().reportContainerResourceUsage(
-                container, currentTime, pId, currentPmemUsage,
-                cpuUsageTotalCoresPercentage);
+            NMTimelinePublisher nmMetricsPublisher =
+                container.getNMTimelinePublisher();
+            if (nmMetricsPublisher != null) {
+              nmMetricsPublisher.reportContainerResourceUsage(
+                  container, currentTime, pId, currentPmemUsage,
+                  cpuUsageTotalCoresPercentage);
+            }
           } catch (Exception e) {
             // Log the exception and proceed to the next container.
             LOG.warn("Uncaught exception in ContainersMonitorImpl "