You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2015/12/11 20:19:28 UTC

[1/2] hadoop git commit: YARN-4356. Ensure the timeline service v.2 is disabled cleanly and has no impact when it's turned off. Contributed by Sangjin Lee.

Repository: hadoop
Updated Branches:
  refs/heads/feature-YARN-2928 0801563b4 -> 45510fc6b


http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 2c5c300..69de433 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -56,12 +55,16 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
+/**
+ * Metrics publisher service that publishes data to the timeline service v.2. It
+ * is used only if the timeline service v.2 is enabled and the system publishing
+ * of events and metrics is enabled.
+ */
 public class NMTimelinePublisher extends CompositeService {
 
   private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class);
 
   private Dispatcher dispatcher;
-  private boolean publishSystemMetrics;
 
   private Context context;
 
@@ -76,24 +79,16 @@ public class NMTimelinePublisher extends CompositeService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    publishSystemMetrics =
-        YarnConfiguration.systemMetricsPublisherEnabled(conf);
-
-    if (publishSystemMetrics) {
-      dispatcher = new AsyncDispatcher();
-      dispatcher.register(NMTimelineEventType.class,
-          new ForwardingEventHandler());
-      dispatcher
-          .register(ContainerEventType.class, new ContainerEventHandler());
-      dispatcher.register(ApplicationEventType.class,
-          new ApplicationEventHandler());
-      dispatcher.register(LocalizationEventType.class,
-          new LocalizationEventDispatcher());
-      addIfService(dispatcher);
-      LOG.info("YARN system metrics publishing service is enabled");
-    } else {
-      LOG.info("YARN system metrics publishing service is not enabled");
-    }
+    dispatcher = new AsyncDispatcher();
+    dispatcher.register(NMTimelineEventType.class,
+        new ForwardingEventHandler());
+    dispatcher
+        .register(ContainerEventType.class, new ContainerEventHandler());
+    dispatcher.register(ApplicationEventType.class,
+        new ApplicationEventHandler());
+    dispatcher.register(LocalizationEventType.class,
+        new LocalizationEventDispatcher());
+    addIfService(dispatcher);
     super.serviceInit(conf);
   }
 
@@ -121,8 +116,9 @@ public class NMTimelinePublisher extends CompositeService {
   public void reportContainerResourceUsage(Container container,
       long createdTime, String pId, Long pmemUsage,
       Float cpuUsageTotalCoresPercentage) {
-    if (publishSystemMetrics
-        && (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE)) {
+    if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
+        cpuUsageTotalCoresPercentage !=
+            ResourceCalculatorProcessTree.UNAVAILABLE) {
       ContainerEntity entity =
           createContainerEntity(container.getContainerId());
       long currentTimeMillis = System.currentTimeMillis();
@@ -219,9 +215,6 @@ public class NMTimelinePublisher extends CompositeService {
   }
 
   public void publishApplicationEvent(ApplicationEvent event) {
-    if (!publishSystemMetrics) {
-      return;
-    }
     // publish only when the desired event is received
     switch (event.getType()) {
     case INIT_APPLICATION:
@@ -242,9 +235,6 @@ public class NMTimelinePublisher extends CompositeService {
   }
 
   public void publishContainerEvent(ContainerEvent event) {
-    if (!publishSystemMetrics) {
-      return;
-    }
     // publish only when the desired event is received
     switch (event.getType()) {
     case INIT_CONTAINER:
@@ -262,9 +252,6 @@ public class NMTimelinePublisher extends CompositeService {
   }
 
   public void publishLocalizationEvent(LocalizationEvent event) {
-    if (!publishSystemMetrics) {
-      return;
-    }
     // publish only when the desired event is received
     switch (event.getType()) {
     case CONTAINER_RESOURCES_LOCALIZED:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index f29b791..c43777c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -95,7 +94,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -106,7 +104,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class TestContainerManagerRecovery extends BaseContainerManagerTest {
 
@@ -473,7 +470,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
       NMStateStoreService stateStore) {
     NMContext context = new NMContext(new NMContainerTokenSecretManager(
         conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore, conf){
+        new ApplicationACLsManager(conf), stateStore, conf) {
       public int getHttpPort() {
         return HTTP_PORT;
       }
@@ -638,9 +635,9 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
           }
 
           @Override
-          public NMTimelinePublisher createNMTimelinePublisher(Context context) {
-            NMTimelinePublisher timelinePublisher = mock(NMTimelinePublisher.class);
-            return timelinePublisher;
+          public NMTimelinePublisher
+              createNMTimelinePublisher(Context context) {
+            return null;
           }
     };
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
index 38b3172f..f31a98c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -535,7 +534,7 @@ public class TestApplication {
       this.appId = BuilderUtils.newApplicationId(timestamp, id);
 
       app = new ApplicationImpl(
-          dispatcher, this.user, null, null, 0, appId, null, context);
+          dispatcher, this.user, appId, null, context);
       containers = new ArrayList<Container>();
       for (int i = 0; i < numContainers; i++) {
         Container container = createMockedContainer(this.appId, i);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index ef5eb65..a6818ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -31,17 +31,14 @@ import javax.ws.rs.core.MediaType;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.NodeHealthScriptRunner;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -50,7 +47,6 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
@@ -64,6 +60,7 @@ import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.w3c.dom.Document;
@@ -327,7 +324,7 @@ public class TestNMWebServices extends JerseyTestBase {
     final String filename = "logfile1";
     final String logMessage = "log message\n";
     nmContext.getApplications().put(appId, new ApplicationImpl(null, "user",
-        null, null, 0, appId, null, nmContext));
+        appId, null, nmContext));
     
     MockContainer container = new MockContainer(appAttemptId,
         new AsyncDispatcher(), new Configuration(), "user", appId, 1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index dd817d0..9ace1fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -293,8 +293,11 @@ public class ApplicationMasterService extends AbstractService implements
 
     RMApp rmApp =
         rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
+
     // Remove collector address when app get finished.
-    rmApp.removeCollectorAddr();
+    if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+      rmApp.removeCollectorAddr();
+    }
     // checking whether the app exits in RMStateStore at first not to throw
     // ApplicationDoesNotExistInCacheException before and after
     // RM work-preserving restart.
@@ -562,8 +565,10 @@ public class ApplicationMasterService extends AbstractService implements
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
 
       // add collector address for this application
-      allocateResponse.setCollectorAddr(
-          this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
+      if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+        allocateResponse.setCollectorAddr(
+            this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
+      }
 
       // add preemption to the allocateResponse message (if any)
       allocateResponse

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 213c226..04f4df2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -571,24 +571,27 @@ public class ClientRMService extends AbstractService implements
       throw RPCUtil.getRemoteException(ie);
     }
 
-    // Sanity check for flow run
-    String value = null;
-    try {
-      for (String tag : submissionContext.getApplicationTags()) {
-        if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
-            tag.startsWith(
-                TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
-          value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1);
-          Long.valueOf(value);
+    if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+      // Sanity check for flow run
+      String value = null;
+      try {
+        for (String tag : submissionContext.getApplicationTags()) {
+          if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
+              tag.startsWith(
+                  TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
+            value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length()
+                + 1);
+            Long.valueOf(value);
+          }
         }
+      } catch (NumberFormatException e) {
+        LOG.warn("Invalid to flow run: " + value +
+            ". Flow run should be a long integer", e);
+        RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
+            e.getMessage(), "ClientRMService",
+            "Exception in submitting application", applicationId);
+        throw RPCUtil.getRemoteException(e);
       }
-    } catch (NumberFormatException e) {
-      LOG.warn("Invalid to flow run: " + value +
-          ". Flow run should be a long integer", e);
-      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
-          e.getMessage(), "ClientRMService",
-          "Exception in submitting application", applicationId);
-      throw RPCUtil.getRemoteException(e);
     }
 
     // Check whether app has already been put into rmContext,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 261526e..546609b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -364,8 +364,11 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       LOG.warn(message);
       throw new YarnException(message);
     }
-    // Start timeline collector for the submitted app
-    application.startTimelineCollector();
+
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      // Start timeline collector for the submitted app
+      application.startTimelineCollector();
+    }
     // Inform the ACLs Manager
     this.applicationACLsManager.addApplication(applicationId,
         submissionContext.getAMContainerSpec().getApplicationACLs());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index a5de053..1fafc85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -375,18 +375,19 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
-    boolean timelineServiceEnabled =
-        conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
-    SystemMetricsPublisher publisher = null;
-    if (timelineServiceEnabled) {
-      if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-          YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
-        LOG.info("TimelineService V1 is configured");
-        publisher = new TimelineServiceV1Publisher();
-      } else {
-        LOG.info("TimelineService V2 is configured");
+    SystemMetricsPublisher publisher;
+    if (YarnConfiguration.timelineServiceEnabled(conf) &&
+        YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
+      if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+        // we're dealing with the v.2.x publisher
+        LOG.info("system metrics publisher with the timeline service V2 is " +
+            "configured");
         publisher = new TimelineServiceV2Publisher(rmContext);
+      } else {
+        // we're dealing with the v.1.x publisher
+        LOG.info("system metrics publisher with the timeline service V1 is " +
+            "configured");
+        publisher = new TimelineServiceV1Publisher();
       }
     } else {
       LOG.info("TimelineServicePublisher is not configured");
@@ -515,10 +516,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
       addService(rmApplicationHistoryWriter);
       rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
 
-      RMTimelineCollectorManager timelineCollectorManager =
-          createRMTimelineCollectorManager();
-      addService(timelineCollectorManager);
-      rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
+      if (YarnConfiguration.timelineServiceV2Enabled(configuration)) {
+        RMTimelineCollectorManager timelineCollectorManager =
+            createRMTimelineCollectorManager();
+        addService(timelineCollectorManager);
+        rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
+      }
 
       // Register event handler for NodesListManager
       nodesListManager = new NodesListManager(rmContext);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index e7eee6b..8cf65a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -445,10 +445,15 @@ public class ResourceTrackerService extends AbstractService implements
           message);
     }
 
-    // Check & update collectors info from request.
-    // TODO make sure it won't have race condition issue for AM failed over case
-    // that the older registration could possible override the newer one.
-    updateAppCollectorsMap(request);
+    boolean timelineV2Enabled =
+        YarnConfiguration.timelineServiceV2Enabled(getConfig());
+    if (timelineV2Enabled) {
+      // Check & update collectors info from request.
+      // TODO make sure it won't have race condition issue for AM failed over
+      // case that the older registration could possible override the newer
+      // one.
+      updateAppCollectorsMap(request);
+    }
 
     // Heartbeat response
     NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
@@ -467,12 +472,12 @@ public class ResourceTrackerService extends AbstractService implements
       nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
     }
 
-    // Return collectors' map that NM needs to know
-    // TODO we should optimize this to only include collector info that NM
-    // doesn't know yet.
     List<ApplicationId> keepAliveApps =
         remoteNodeStatus.getKeepAliveApplications();
-    if (keepAliveApps != null) {
+    if (timelineV2Enabled && keepAliveApps != null) {
+      // Return collectors' map that NM needs to know
+      // TODO we should optimize this to only include collector info that NM
+      // doesn't know yet.
       setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
index df760a3..9a7638c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.client.NMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -215,12 +216,14 @@ public class AMLauncher implements Runnable {
             .get(applicationId)
             .getSubmitTime()));
 
-    // Set flow context info
-    for (String tag :
-        rmContext.getRMApps().get(applicationId).getApplicationTags()) {
-      setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag);
-      setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag);
-      setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag);
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      // Set flow context info
+      for (String tag :
+          rmContext.getRMApps().get(applicationId).getApplicationTags()) {
+        setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag);
+        setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag);
+        setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag);
+      }
     }
     Credentials credentials = new Credentials();
     DataInputByteBuffer dibb = new DataInputByteBuffer();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
index e0c593d..e83bcc0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * This class is responsible for posting application, appattempt & Container
+ * This class is responsible for posting application, appattempt &amp; Container
  * lifecycle related events to timeline service V2
  */
 @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 05f682a..78f534f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -913,15 +913,17 @@ public class RMAppImpl implements RMApp, Recoverable {
       extends RMAppTransition {
 
     public void transition(RMAppImpl app, RMAppEvent event) {
-      LOG.info("Updating collector info for app: " + app.getApplicationId());
+      if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) {
+        LOG.info("Updating collector info for app: " + app.getApplicationId());
 
-      RMAppCollectorUpdateEvent appCollectorUpdateEvent =
-          (RMAppCollectorUpdateEvent) event;
-      // Update collector address
-      app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
+        RMAppCollectorUpdateEvent appCollectorUpdateEvent =
+            (RMAppCollectorUpdateEvent) event;
+        // Update collector address
+        app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
 
-      // TODO persistent to RMStateStore for recover
-      // Save to RMStateStore
+        // TODO persistent to RMStateStore for recover
+        // Save to RMStateStore
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 8215468..bdf7100 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -641,6 +641,7 @@ public class TestClientRMService {
     ClientRMService rmService =
         new ClientRMService(rmContext, yarnScheduler, appManager,
             mockAclsManager, mockQueueACLsManager, null);
+    rmService.init(new Configuration());
 
     // without name and queue
 
@@ -732,6 +733,7 @@ public class TestClientRMService {
     ClientRMService rmService =
         new ClientRMService(rmContext, yarnScheduler, appManager,
             mockAclsManager, mockQueueACLsManager, null);
+    rmService.init(new Configuration());
 
     // Initialize appnames and queues
     String[] queues = {QUEUE_1, QUEUE_2};
@@ -895,6 +897,7 @@ public class TestClientRMService {
     final ClientRMService rmService =
         new ClientRMService(rmContext, yarnScheduler, appManager, null, null,
             null);
+    rmService.init(new Configuration());
 
     // submit an app and wait for it to block while in app submission
     Thread t = new Thread() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index d6fdb3e..6dbf8c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -74,7 +74,7 @@ public class TestSystemMetricsPublisher {
   public static void setup() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
         MemoryTimelineStore.class, TimelineStore.class);
     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
index 20a5b13..baaa566 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -131,6 +131,7 @@ public class TestSystemMetricsPublisherForV2 {
   private static Configuration getTimelineV2Conf() {
     Configuration conf = new Configuration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
     conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
     conf.setInt(
         YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 5672759..a734340 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -20,41 +20,55 @@ package org.apache.hadoop.yarn.server.timelineservice;
 
 
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.timelineservice.*;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
-import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-
 public class TestTimelineServiceClientIntegration {
   private static NodeTimelineCollectorManager collectorManager;
   private static PerNodeTimelineCollectorsAuxService auxService;
+  private static Configuration conf;
 
   @BeforeClass
   public static void setupClass() throws Exception {
     try {
       collectorManager = new MockNodeTimelineCollectorManager();
+      conf = new YarnConfiguration();
+      // enable timeline service v.2
+      conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
       auxService =
           PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
-              collectorManager);
+              collectorManager, conf);
       auxService.addApplication(ApplicationId.newInstance(0, 1));
     } catch (ExitUtil.ExitException e) {
       fail();
@@ -76,7 +90,7 @@ public class TestTimelineServiceClientIntegration {
       // set the timeline service address manually
       client.setTimelineServiceAddress(
           collectorManager.getRestServerBindAddress());
-      client.init(new YarnConfiguration());
+      client.init(conf);
       client.start();
       TimelineEntity entity = new TimelineEntity();
       entity.setType("test entity type");
@@ -103,7 +117,7 @@ public class TestTimelineServiceClientIntegration {
       // set the timeline service address manually
       client.setTimelineServiceAddress(
           collectorManager.getRestServerBindAddress());
-      client.init(new YarnConfiguration());
+      client.init(conf);
       client.start();
       ClusterEntity cluster = new ClusterEntity();
       cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
index 4147d42..0319e34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
@@ -30,12 +30,11 @@ import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.hadoop.yarn.server.api.ContainerContext;
 import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
 import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
 import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -68,6 +67,9 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      throw new YarnException("Timeline service v2 is not enabled");
+    }
     collectorManager.init(conf);
     super.serviceInit(conf);
   }
@@ -175,7 +177,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
 
   @VisibleForTesting
   public static PerNodeTimelineCollectorsAuxService
-      launchServer(String[] args, NodeTimelineCollectorManager collectorManager) {
+      launchServer(String[] args, NodeTimelineCollectorManager collectorManager,
+      Configuration conf) {
     Thread
       .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(
@@ -187,7 +190,6 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
           new PerNodeTimelineCollectorsAuxService(collectorManager);
       ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
           SHUTDOWN_HOOK_PRIORITY);
-      YarnConfiguration conf = new YarnConfiguration();
       auxService.init(conf);
       auxService.start();
     } catch (Throwable t) {
@@ -210,6 +212,9 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
   }
 
   public static void main(String[] args) {
-    launchServer(args, null);
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    launchServer(args, null, conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
index afe1536..6b4213d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
@@ -66,6 +67,10 @@ public class TimelineReaderServer extends CompositeService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      throw new YarnException("timeline service v.2 is not enabled");
+    }
+
     TimelineReader timelineReaderStore = createTimelineReaderStore(conf);
     addService(timelineReaderStore);
     timelineReaderManager = createTimelineReaderManager(timelineReaderStore);
@@ -143,7 +148,8 @@ public class TimelineReaderServer extends CompositeService {
     return readerWebServer.getConnectorAddress(0).getPort();
   }
 
-  static TimelineReaderServer startTimelineReaderServer(String[] args) {
+  static TimelineReaderServer startTimelineReaderServer(String[] args,
+      Configuration conf) {
     Thread.setDefaultUncaughtExceptionHandler(
         new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(TimelineReaderServer.class,
@@ -154,7 +160,6 @@ public class TimelineReaderServer extends CompositeService {
       ShutdownHookManager.get().addShutdownHook(
           new CompositeServiceShutdownHook(timelineReaderServer),
           SHUTDOWN_HOOK_PRIORITY);
-      YarnConfiguration conf = new YarnConfiguration();
       timelineReaderServer.init(conf);
       timelineReaderServer.start();
     } catch (Throwable t) {
@@ -165,6 +170,9 @@ public class TimelineReaderServer extends CompositeService {
   }
 
   public static void main(String[] args) {
-    startTimelineReaderServer(args);
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    startTimelineReaderServer(args, conf);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
index dafc76e..4fdf47e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
@@ -50,11 +50,16 @@ import java.io.IOException;
 public class TestPerNodeTimelineCollectorsAuxService {
   private ApplicationAttemptId appAttemptId;
   private PerNodeTimelineCollectorsAuxService auxService;
+  private Configuration conf;
 
   public TestPerNodeTimelineCollectorsAuxService() {
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    conf = new YarnConfiguration();
+    // enable timeline service v.2
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
   }
 
   @After
@@ -134,7 +139,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
     try {
       auxService =
           PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
-              createCollectorManager());
+              createCollectorManager(), conf);
     } catch (ExitUtil.ExitException e) {
       assertEquals(0, e.status);
       ExitUtil.resetFirstExitException();
@@ -160,7 +165,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
     NodeTimelineCollectorManager collectorManager = createCollectorManager();
     PerNodeTimelineCollectorsAuxService auxService =
         spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
-    auxService.init(new YarnConfiguration());
+    auxService.init(conf);
     auxService.start();
     return auxService;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java
index 7098814..b42488c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java
@@ -30,8 +30,11 @@ public class TestTimelineReaderServer {
 
   @Test(timeout = 60000)
   public void testStartStopServer() throws Exception {
+    @SuppressWarnings("resource")
     TimelineReaderServer server = new TimelineReaderServer();
     Configuration config = new YarnConfiguration();
+    config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
     config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
         "localhost:0");
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
index 45bce2f..91f6ee5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
@@ -72,6 +72,8 @@ public class TestTimelineReaderWebServices {
   public void init() throws Exception {
     try {
       Configuration config = new YarnConfiguration();
+      config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+      config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
       config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, 
           "localhost:0");
       config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index 3b285aa..818cd89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -238,6 +238,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
   public void init() throws Exception {
     try {
       Configuration config = util.getConfiguration();
+      config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+      config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
       config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
           "localhost:0");
       config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");


[2/2] hadoop git commit: YARN-4356. Ensure the timeline service v.2 is disabled cleanly and has no impact when it's turned off. Contributed by Sangjin Lee.

Posted by gt...@apache.org.
YARN-4356. Ensure the timeline service v.2 is disabled cleanly and has no
impact when it's turned off. Contributed by Sangjin Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/45510fc6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/45510fc6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/45510fc6

Branch: refs/heads/feature-YARN-2928
Commit: 45510fc6bdcfe48aa8aa192f511ebfdada370c78
Parents: 0801563
Author: Li Lu <gt...@apache.org>
Authored: Fri Dec 11 11:17:34 2015 -0800
Committer: Li Lu <gt...@apache.org>
Committed: Fri Dec 11 11:17:34 2015 -0800

----------------------------------------------------------------------
 .../jobhistory/JobHistoryEventHandler.java      |  64 ++++---
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  11 +-
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   5 -
 .../src/main/resources/mapred-default.xml       |   7 -
 .../mapred/TestMRTimelineEventHandling.java     |   5 +-
 .../hadoop/mapreduce/v2/MiniMRYarnCluster.java  |   2 +-
 .../hadoop/yarn/conf/YarnConfiguration.java     |  58 +++++-
 .../distributedshell/ApplicationMaster.java     | 191 ++++++++-----------
 .../applications/distributedshell/Client.java   |  16 --
 .../distributedshell/TestDistributedShell.java  |  14 +-
 .../impl/pb/AllocateResponsePBImpl.java         |   4 +-
 .../hadoop/yarn/client/api/TimelineClient.java  |  18 +-
 .../client/api/impl/TimelineClientImpl.java     |  11 +-
 .../src/main/resources/yarn-default.xml         |   5 +-
 .../impl/pb/NodeHeartbeatRequestPBImpl.java     |  10 +-
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  10 +-
 .../hadoop/yarn/server/nodemanager/Context.java |   3 +-
 .../yarn/server/nodemanager/NodeManager.java    |  23 ++-
 .../nodemanager/NodeStatusUpdaterImpl.java      |  48 +++--
 .../collectormanager/NMCollectorService.java    |  10 +-
 .../containermanager/ContainerManagerImpl.java  |  59 ++++--
 .../application/ApplicationImpl.java            |  70 +++++--
 .../monitor/ContainersMonitorImpl.java          |  11 +-
 .../timelineservice/NMTimelinePublisher.java    |  49 ++---
 .../TestContainerManagerRecovery.java           |  11 +-
 .../application/TestApplication.java            |   3 +-
 .../nodemanager/webapp/TestNMWebServices.java   |   9 +-
 .../ApplicationMasterService.java               |  11 +-
 .../server/resourcemanager/ClientRMService.java |  35 ++--
 .../server/resourcemanager/RMAppManager.java    |   7 +-
 .../server/resourcemanager/ResourceManager.java |  33 ++--
 .../resourcemanager/ResourceTrackerService.java |  21 +-
 .../resourcemanager/amlauncher/AMLauncher.java  |  15 +-
 .../metrics/TimelineServiceV2Publisher.java     |   2 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java |  16 +-
 .../resourcemanager/TestClientRMService.java    |   3 +
 .../metrics/TestSystemMetricsPublisher.java     |   2 +-
 .../TestSystemMetricsPublisherForV2.java        |   1 +
 .../TestTimelineServiceClientIntegration.java   |  30 ++-
 .../PerNodeTimelineCollectorsAuxService.java    |  15 +-
 .../reader/TimelineReaderServer.java            |  14 +-
 ...TestPerNodeTimelineCollectorsAuxService.java |   9 +-
 .../reader/TestTimelineReaderServer.java        |   3 +
 .../reader/TestTimelineReaderWebServices.java   |   2 +
 ...stTimelineReaderWebServicesHBaseStorage.java |   2 +
 45 files changed, 540 insertions(+), 408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index f52e654..19699fb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -19,9 +19,6 @@
 package org.apache.hadoop.mapreduce.jobhistory;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -30,7 +27,11 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -46,7 +47,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -56,9 +56,9 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
-import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@@ -74,9 +74,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * The job history events get routed to this class. This class writes the Job
  * history events to the DFS directly into a staging dir and then moved to a
@@ -122,20 +121,17 @@ public class JobHistoryEventHandler extends AbstractService
 
   protected static final Map<JobId, MetaInfo> fileMap =
     Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
-  
-  // For posting entities in new timeline service in a non-blocking way
-  // TODO YARN-3367 replace with event loop in TimelineClient.
-  private static ExecutorService threadPool =
-      Executors.newCachedThreadPool(
-          new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
-          .build());
 
   // should job completion be force when the AM shuts down?
   protected volatile boolean forceJobCompletion = false;
 
   protected TimelineClient timelineClient;
   
-  private boolean newTimelineServiceEnabled = false;
+  private boolean timelineServiceV2Enabled = false;
+
+  // For posting entities in new timeline service in a non-blocking way
+  // TODO YARN-3367 replace with event loop in TimelineClient.
+  private ExecutorService threadPool;
 
   private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
   private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
@@ -265,22 +261,26 @@ public class JobHistoryEventHandler extends AbstractService
     // configuration status: off, on_with_v1 or on_with_v2.
     if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
         MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
-      if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-        
+      LOG.info("Emitting job history data to the timeline service is enabled");
+      if (YarnConfiguration.timelineServiceEnabled(conf)) {
+
         timelineClient = 
             ((MRAppMaster.RunningAppContext)context).getTimelineClient();
         timelineClient.init(conf);
-        newTimelineServiceEnabled = conf.getBoolean(
-            MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
-            MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
-        LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1"));
-        LOG.info("Emitting job history data to the timeline server is enabled");
+        timelineServiceV2Enabled =
+            YarnConfiguration.timelineServiceV2Enabled(conf);
+        LOG.info("Timeline service is enabled; version: " +
+            YarnConfiguration.getTimelineServiceVersion(conf));
+        if (timelineServiceV2Enabled) {
+          // initialize the thread pool for v.2 timeline service
+          threadPool = createThreadPool();
+        }
       } else {
         LOG.info("Timeline service is not enabled");
       }
     } else {
-      LOG.info("Emitting job history data to the timeline server is not enabled");
+      LOG.info("Emitting job history data to the timeline server is not " +
+          "enabled");
     }
 
     // Flag for setting
@@ -448,19 +448,27 @@ public class JobHistoryEventHandler extends AbstractService
     if (timelineClient != null) {
       timelineClient.stop();
     }
-    shutdownAndAwaitTermination();
+    if (threadPool != null) {
+      shutdownAndAwaitTermination();
+    }
     LOG.info("Stopped JobHistoryEventHandler. super.stop()");
     super.serviceStop();
   }
   
   // TODO remove threadPool after adding non-blocking call in TimelineClient
-  private static void shutdownAndAwaitTermination() {
+  private ExecutorService createThreadPool() {
+    return Executors.newCachedThreadPool(
+      new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+      .build());
+  }
+
+  private void shutdownAndAwaitTermination() {
     threadPool.shutdown();
     try {
       if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
         threadPool.shutdownNow(); 
         if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
-            LOG.error("ThreadPool did not terminate");
+          LOG.error("ThreadPool did not terminate");
       }
     } catch (InterruptedException ie) {
       threadPool.shutdownNow();
@@ -622,7 +630,7 @@ public class JobHistoryEventHandler extends AbstractService
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
         if (timelineClient != null) {
-          if (newTimelineServiceEnabled) {
+          if (timelineServiceV2Enabled) {
             processEventForNewTimelineService(historyEvent, event.getJobID(),
                 event.getTimestamp());
           } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index dafb6e9..a7c9245 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -1017,14 +1017,9 @@ public class MRAppMaster extends CompositeService {
       this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
       if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
               MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)
-            && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-                YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-
-        boolean newTimelineServiceEnabled = conf.getBoolean(
-            MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
-            MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
-            
-        if (newTimelineServiceEnabled) {
+            && YarnConfiguration.timelineServiceEnabled(conf)) {
+
+        if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
           // create new version TimelineClient
           timelineClient = TimelineClient.createTimelineClient(
               appAttemptID.getApplicationId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 3ab6eeb..3d1e841 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -467,11 +467,6 @@ public interface MRJobConfig {
     "mapreduce.job.emit-timeline-data";
   public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
       false;
-  
-  public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
-      "mapreduce.job.new-timeline-service.enabled";
-  public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
-      false;
 
   public static final String MR_PREFIX = "yarn.app.mapreduce.";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 0585234..6ece048 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -618,13 +618,6 @@
     </description>
 </property>
 
- <property>
-    <name>mapreduce.job.new-timeline-service.enabled</name>
-    <value>false</value>
-    <description>Specifies if posting job and task events to new timeline service.
-    </description>
-</property>
-
 <property>
   <name>mapreduce.input.fileinputformat.split.minsize</name>
   <value>0</value>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index b3ea26e..7b322e5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -161,11 +161,10 @@ public class TestMRTimelineEventHandling {
     LOG.info("testMRNewTimelineServiceEventHandling start.");
     Configuration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    // enable new timeline service
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
     conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
 
-    // enable new timeline serivce in MR side
-    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, true);
-
     // enable aux-service based timeline collectors
     conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
     conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
index 18a4c14..edb825d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
@@ -173,7 +173,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
     boolean enableTimelineAuxService = false;
     if (nmAuxServices != null) {
       for (String nmAuxService: nmAuxServices) {
-        if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) {
+        if (nmAuxService.equals(TIMELINE_AUX_SERVICE_NAME)) {
           enableTimelineAuxService = true;
           break;
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0165593..6a3854a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -83,6 +83,10 @@ public class YarnConfiguration extends Configuration {
         new DeprecationDelta("yarn.client.max-nodemanagers-proxies",
             NM_CLIENT_MAX_NM_PROXIES)
     });
+    Configuration.addDeprecations(new DeprecationDelta[] {
+        new DeprecationDelta(RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+            SYSTEM_METRICS_PUBLISHER_ENABLED)
+    });
   }
 
   //Configurations
@@ -383,7 +387,8 @@ public class YarnConfiguration extends Configuration {
 
   /**
    *  The setting that controls whether yarn system metrics is published on the
-   *  timeline server or not by RM. This configuration setting is for ATS V1
+   *  timeline server or not by RM. This configuration setting is for ATS V1.
+   *  This is now deprecated in favor of SYSTEM_METRICS_PUBLISHER_ENABLED.
    */
   public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX
       + "system-metrics-publisher.enabled";
@@ -2344,13 +2349,52 @@ public class YarnConfiguration extends Configuration {
     }
     return clusterId;
   }
-  
-  public static boolean systemMetricsPublisherEnabled(Configuration conf) {
+
+  // helper methods for timeline service configuration
+  /**
+   * Returns whether the timeline service is enabled via configuration.
+   *
+   * @param conf the configuration
+   * @return whether the timeline service is enabled.
+   */
+  public static boolean timelineServiceEnabled(Configuration conf) {
     return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
-        && conf.getBoolean(
-            YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
-            YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);  
+      YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+  }
+
+  /**
+   * Returns the timeline service version. It does not check whether the
+   * timeline service itself is enabled.
+   *
+   * @param conf the configuration
+   * @return the timeline service version as a float.
+   */
+  public static float getTimelineServiceVersion(Configuration conf) {
+    return conf.getFloat(TIMELINE_SERVICE_VERSION,
+        DEFAULT_TIMELINE_SERVICE_VERSION);
+  }
+
+  /**
+   * Returns whether the timeline service v.2 is enabled via configuration.
+   *
+   * @param conf the configuration
+   * @return whether the timeline service v.2 is enabled. V.2 refers to a
+   * version greater than equal to 2 but smaller than 3.
+   */
+  public static boolean timelineServiceV2Enabled(Configuration conf) {
+    return timelineServiceEnabled(conf) &&
+        (int)getTimelineServiceVersion(conf) == 2;
+  }
+
+  /**
+   * Returns whether the system publisher is enabled.
+   *
+   * @param conf the configuration
+   * @return whether the system publisher is enabled.
+   */
+  public static boolean systemMetricsPublisherEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+        YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
   }
 
   /* For debugging. mp configurations to system output as XML format. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index e27c947..380ba29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -217,14 +217,11 @@ public class ApplicationMaster {
   // Tracking url to which app master publishes info for clients to monitor
   private String appMasterTrackingUrl = "";
 
-  private boolean newTimelineService = false;
+  private boolean timelineServiceV2 = false;
 
   // For posting entities in new timeline service in a non-blocking way
   // TODO replace with event loop in TimelineClient.
-  private static ExecutorService threadPool =
-      Executors.newCachedThreadPool(
-          new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
-          .build());
+  private ExecutorService threadPool;
 
   // App Master configuration
   // No. of containers to run shell command on
@@ -314,8 +311,10 @@ public class ApplicationMaster {
       }
       appMaster.run();
       result = appMaster.finish();
-      
-      shutdownAndAwaitTermination();
+
+      if (appMaster.threadPool != null) {
+        appMaster.shutdownAndAwaitTermination();
+      }
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
       LogManager.shutdown();
@@ -329,16 +328,22 @@ public class ApplicationMaster {
       System.exit(2);
     }
   }
-  
+
   //TODO remove threadPool after adding non-blocking call in TimelineClient
-  private static void shutdownAndAwaitTermination() {
+  private ExecutorService createThreadPool() {
+    return Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+        .build());
+  }
+
+  private void shutdownAndAwaitTermination() {
     threadPool.shutdown();
     try {
       // Wait a while for existing tasks to terminate
       if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
         threadPool.shutdownNow();
         if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
-            LOG.error("ThreadPool did not terminate");
+          LOG.error("ThreadPool did not terminate");
       }
     } catch (InterruptedException ie) {
       threadPool.shutdownNow();
@@ -404,8 +409,7 @@ public class ApplicationMaster {
         "No. of containers on which the shell command needs to be executed");
     opts.addOption("priority", true, "Application Priority. Default 0");
     opts.addOption("debug", false, "Dump out debug information");
-    opts.addOption("timeline_service_version", true,
-        "Version for timeline service");
+
     opts.addOption("help", false, "Print usage");
     CommandLine cliParser = new GnuParser().parse(opts, args);
 
@@ -542,27 +546,15 @@ public class ApplicationMaster {
     requestPriority = Integer.parseInt(cliParser
         .getOptionValue("priority", "0"));
 
-    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-      YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-      if (cliParser.hasOption("timeline_service_version")) {
-        String timelineServiceVersion =
-            cliParser.getOptionValue("timeline_service_version", "v1");
-        if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) {
-          newTimelineService = false;
-        } else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) {
-          newTimelineService = true;
-        } else {
-          throw new IllegalArgumentException(
-              "timeline_service_version is not set properly, should be 'v1' or 'v2'");
-        }
+    if (YarnConfiguration.timelineServiceEnabled(conf)) {
+      timelineServiceV2 =
+          YarnConfiguration.timelineServiceV2Enabled(conf);
+      if (timelineServiceV2) {
+        threadPool = createThreadPool();
       }
     } else {
       timelineClient = null;
       LOG.warn("Timeline service is not enabled");
-      if (cliParser.hasOption("timeline_service_version")) {
-        throw new IllegalArgumentException(
-            "Timeline service is not enabled");
-      }
     }
 
     return true;
@@ -623,16 +615,16 @@ public class ApplicationMaster {
     nmClientAsync.start();
 
     startTimelineClient(conf);
-    // need to bind timelineClient
-    amRMClient.registerTimelineClient(timelineClient);
+    if (timelineServiceV2) {
+      // need to bind timelineClient
+      amRMClient.registerTimelineClient(timelineClient);
+    }
     if(timelineClient != null) {
-      if (newTimelineService) {
-        publishApplicationAttemptEventOnNewTimelineService(timelineClient, 
-            appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, 
-            appSubmitterUgi);
+      if (timelineServiceV2) {
+        publishApplicationAttemptEventOnTimelineServiceV2(
+            DSEvent.DS_APP_ATTEMPT_START);
       } else {
-        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-            DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
+        publishApplicationAttemptEvent(DSEvent.DS_APP_ATTEMPT_START);
       }
     }
 
@@ -703,10 +695,9 @@ public class ApplicationMaster {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
         @Override
         public Void run() throws Exception {
-          if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+          if (YarnConfiguration.timelineServiceEnabled(conf)) {
             // Creating the Timeline Client
-            if (newTimelineService) {
+            if (timelineServiceV2) {
               timelineClient = TimelineClient.createTimelineClient(
                   appAttemptID.getApplicationId());
             } else {
@@ -742,13 +733,11 @@ public class ApplicationMaster {
     }
 
     if (timelineClient != null) {
-      if (newTimelineService) {
-        publishApplicationAttemptEventOnNewTimelineService(timelineClient,
-          appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId,
-          appSubmitterUgi);
+      if (timelineServiceV2) {
+        publishApplicationAttemptEventOnTimelineServiceV2(
+            DSEvent.DS_APP_ATTEMPT_END);
       } else {
-        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
+        publishApplicationAttemptEvent(DSEvent.DS_APP_ATTEMPT_END);
       }
     }
 
@@ -855,12 +844,10 @@ public class ApplicationMaster {
               + containerStatus.getContainerId());
         }
         if(timelineClient != null) {
-          if (newTimelineService) {
-            publishContainerEndEventOnNewTimelineService(
-                timelineClient, containerStatus, domainId, appSubmitterUgi);
+          if (timelineServiceV2) {
+            publishContainerEndEventOnTimelineServiceV2(containerStatus);
           } else {
-            publishContainerEndEvent(
-                timelineClient, containerStatus, domainId, appSubmitterUgi);
+            publishContainerEndEvent(containerStatus);
           }
         }
       }
@@ -981,14 +968,11 @@ public class ApplicationMaster {
         applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
       }
       if(applicationMaster.timelineClient != null) {
-        if (applicationMaster.newTimelineService) {
-            ApplicationMaster.publishContainerStartEventOnNewTimelineService(
-                applicationMaster.timelineClient, container,
-                applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+        if (applicationMaster.timelineServiceV2) {
+            applicationMaster.publishContainerStartEventOnTimelineServiceV2(
+                container);
         } else {
-          ApplicationMaster.publishContainerStartEvent(
-              applicationMaster.timelineClient, container,
-              applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+          applicationMaster.publishContainerStartEvent(container);
         }
       }
     }
@@ -1195,14 +1179,12 @@ public class ApplicationMaster {
     }
   }
   
-  private static void publishContainerStartEvent(
-      final TimelineClient timelineClient, Container container, String domainId,
-      UserGroupInformation ugi) {
+  private void publishContainerStartEvent(Container container) {
     final TimelineEntity entity = new TimelineEntity();
     entity.setEntityId(container.getId().toString());
     entity.setEntityType(DSEntity.DS_CONTAINER.toString());
     entity.setDomainId(domainId);
-    entity.addPrimaryFilter("user", ugi.getShortUserName());
+    entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
     TimelineEvent event = new TimelineEvent();
     event.setTimestamp(System.currentTimeMillis());
     event.setEventType(DSEvent.DS_CONTAINER_START.toString());
@@ -1211,12 +1193,13 @@ public class ApplicationMaster {
     entity.addEvent(event);
 
     try {
-      ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
-        @Override
-        public TimelinePutResponse run() throws Exception {
-          return timelineClient.putEntities(entity);
-        }
-      });
+      appSubmitterUgi.doAs(
+          new PrivilegedExceptionAction<TimelinePutResponse>() {
+            @Override
+            public TimelinePutResponse run() throws Exception {
+              return timelineClient.putEntities(entity);
+            }
+        });
     } catch (Exception e) {
       LOG.error("Container start event could not be published for "
           + container.getId().toString(),
@@ -1224,14 +1207,12 @@ public class ApplicationMaster {
     }
   }
 
-  private static void publishContainerEndEvent(
-      final TimelineClient timelineClient, ContainerStatus container,
-      String domainId, UserGroupInformation ugi) {
+  private void publishContainerEndEvent(ContainerStatus container) {
     final TimelineEntity entity = new TimelineEntity();
     entity.setEntityId(container.getContainerId().toString());
     entity.setEntityType(DSEntity.DS_CONTAINER.toString());
     entity.setDomainId(domainId);
-    entity.addPrimaryFilter("user", ugi.getShortUserName());
+    entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
     TimelineEvent event = new TimelineEvent();
     event.setTimestamp(System.currentTimeMillis());
     event.setEventType(DSEvent.DS_CONTAINER_END.toString());
@@ -1246,14 +1227,12 @@ public class ApplicationMaster {
     }
   }
 
-  private static void publishApplicationAttemptEvent(
-      final TimelineClient timelineClient, String appAttemptId,
-      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
+  private void publishApplicationAttemptEvent(DSEvent appEvent) {
     final TimelineEntity entity = new TimelineEntity();
-    entity.setEntityId(appAttemptId);
+    entity.setEntityId(appAttemptID.toString());
     entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
     entity.setDomainId(domainId);
-    entity.addPrimaryFilter("user", ugi.getShortUserName());
+    entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
     TimelineEvent event = new TimelineEvent();
     event.setEventType(appEvent.toString());
     event.setTimestamp(System.currentTimeMillis());
@@ -1264,7 +1243,7 @@ public class ApplicationMaster {
       LOG.error("App Attempt "
           + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
           + " event could not be published for "
-          + appAttemptId.toString(), e);
+          + appAttemptID, e);
     }
   }
 
@@ -1296,27 +1275,24 @@ public class ApplicationMaster {
     return new Thread(runnableLaunchContainer);
   }
   
-  private static void publishContainerStartEventOnNewTimelineService(
-      final TimelineClient timelineClient, final Container container,
-      final String domainId, final UserGroupInformation ugi) {
+  private void publishContainerStartEventOnTimelineServiceV2(
+      final Container container) {
     Runnable publishWrapper = new Runnable() {
       public void run() {
-        publishContainerStartEventOnNewTimelineServiceBase(timelineClient,
-            container, domainId, ugi);
+        publishContainerStartEventOnTimelineServiceV2Base(container);
       }
     };
     threadPool.execute(publishWrapper);
   }
 
-  private static void publishContainerStartEventOnNewTimelineServiceBase(
-      final TimelineClient timelineClient, Container container, String domainId,
-      UserGroupInformation ugi) {
+  private void publishContainerStartEventOnTimelineServiceV2Base(
+      Container container) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
     entity.setId(container.getId().toString());
     entity.setType(DSEntity.DS_CONTAINER.toString());
     //entity.setDomainId(domainId);
-    entity.addInfo("user", ugi.getShortUserName());
+    entity.addInfo("user", appSubmitterUgi.getShortUserName());
 
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
@@ -1327,7 +1303,7 @@ public class ApplicationMaster {
     entity.addEvent(event);
 
     try {
-      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
           timelineClient.putEntities(entity);
@@ -1341,27 +1317,24 @@ public class ApplicationMaster {
     }
   }
 
-  private static void publishContainerEndEventOnNewTimelineService(
-      final TimelineClient timelineClient, final ContainerStatus container,
-      final String domainId, final UserGroupInformation ugi) {
+  private void publishContainerEndEventOnTimelineServiceV2(
+      final ContainerStatus container) {
     Runnable publishWrapper = new Runnable() {
       public void run() {
-          publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
-              container, domainId, ugi);
+          publishContainerEndEventOnTimelineServiceV2Base(container);
       }
     };
     threadPool.execute(publishWrapper);
   }
 
-  private static void publishContainerEndEventOnNewTimelineServiceBase(
-      final TimelineClient timelineClient, final ContainerStatus container,
-      final String domainId, final UserGroupInformation ugi) {
+  private void publishContainerEndEventOnTimelineServiceV2Base(
+      final ContainerStatus container) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
     entity.setId(container.getContainerId().toString());
     entity.setType(DSEntity.DS_CONTAINER.toString());
     //entity.setDomainId(domainId);
-    entity.addInfo("user", ugi.getShortUserName());
+    entity.addInfo("user", appSubmitterUgi.getShortUserName());
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
         new  org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
     event.setTimestamp(System.currentTimeMillis());
@@ -1371,7 +1344,7 @@ public class ApplicationMaster {
     entity.addEvent(event);
 
     try {
-      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
           timelineClient.putEntities(entity);
@@ -1385,29 +1358,25 @@ public class ApplicationMaster {
     }
   }
 
-  private static void publishApplicationAttemptEventOnNewTimelineService(
-      final TimelineClient timelineClient, final String appAttemptId,
-      final DSEvent appEvent, final String domainId,
-      final UserGroupInformation ugi) {
+  private void publishApplicationAttemptEventOnTimelineServiceV2(
+      final DSEvent appEvent) {
 
     Runnable publishWrapper = new Runnable() {
       public void run() {
-        publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient,
-            appAttemptId, appEvent, domainId, ugi);
+        publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent);
       }
     };
     threadPool.execute(publishWrapper);
   }
 
-  private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
-      final TimelineClient timelineClient, String appAttemptId,
-      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
+  private void publishApplicationAttemptEventOnTimelineServiceV2Base(
+      DSEvent appEvent) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
-    entity.setId(appAttemptId);
+    entity.setId(appAttemptID.toString());
     entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
     //entity.setDomainId(domainId);
-    entity.addInfo("user", ugi.getShortUserName());
+    entity.addInfo("user", appSubmitterUgi.getShortUserName());
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
     event.setId(appEvent.toString());
@@ -1415,7 +1384,7 @@ public class ApplicationMaster {
     entity.addEvent(event);
 
     try {
-      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
           timelineClient.putEntities(entity);
@@ -1426,7 +1395,7 @@ public class ApplicationMaster {
       LOG.error("App Attempt "
           + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
           + " event could not be published for "
-          + appAttemptId.toString(),
+          + appAttemptID,
           e instanceof UndeclaredThrowableException ? e.getCause() : e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 2819c91..e66005e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -192,8 +192,6 @@ public class Client {
   // Command line options
   private Options opts;
 
-  private String timelineServiceVersion;
-
   private static final String shellCommandPath = "shellCommands";
   private static final String shellArgsPath = "shellArgs";
   private static final String appMasterJarPath = "AppMaster.jar";
@@ -269,7 +267,6 @@ public class Client {
     opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
     opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
     opts.addOption("log_properties", true, "log4j.properties file");
-    opts.addOption("timeline_service_version", true, "Version for timeline service");
     opts.addOption("keep_containers_across_application_attempts", false,
       "Flag to indicate whether to keep containers across application attempts." +
       " If the flag is true, running containers will not be killed when" +
@@ -371,16 +368,6 @@ public class Client {
           + " Specified virtual cores=" + amVCores);
     }
 
-    if (cliParser.hasOption("timeline_service_version")) {
-      timelineServiceVersion =
-        cliParser.getOptionValue("timeline_service_version", "v1");
-      if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") ||
-          timelineServiceVersion.trim().equalsIgnoreCase("v2"))) {
-        throw new IllegalArgumentException(
-              "timeline_service_version is not set properly, should be 'v1' or 'v2'");
-      }
-    }
-
     if (!cliParser.hasOption("jar")) {
       throw new IllegalArgumentException("No jar file specified for application master");
     }		
@@ -690,9 +677,6 @@ public class Client {
       vargs.add("--debug");
     }
 
-    if (timelineServiceVersion != null) {
-      vargs.add("--timeline_service_version " + timelineServiceVersion);
-    }
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index fe817c3..b3ff9b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -99,22 +99,19 @@ public class TestDistributedShell {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
     conf.set("yarn.log.dir", "target");
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
 
     if (!testName.getMethodName().toLowerCase().contains("v2")) {
       // disable aux-service based timeline collectors
       conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
-      conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-          true);
-      conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, false);
     } else {
+      // set version to 2
+      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
       // enable aux-service based timeline collectors
       conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
       conf.set(YarnConfiguration.NM_AUX_SERVICES + "."
           + TIMELINE_AUX_SERVICE_NAME + ".class",
           PerNodeTimelineCollectorsAuxService.class.getName());
-      conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-          false);
     }
     conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
     conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
@@ -245,12 +242,7 @@ public class TestDistributedShell {
     }
     boolean isTestingTimelineV2 = false;
     if (timelineVersion.equalsIgnoreCase("v2")) {
-      String[] timelineArgs = {
-          "--timeline_service_version",
-          "v2"
-      };
       isTestingTimelineV2 = true;
-      args = mergeArgs(args, timelineArgs);
       if (!defaultFlow) {
         String[] flowArgs = {
             "--flow_name",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index 7176146..d096a6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -403,7 +403,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
   }
 
   @Override
-  public Priority getApplicationPriority() {
+  public synchronized Priority getApplicationPriority() {
     AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
     if (this.appPriority != null) {
       return this.appPriority;
@@ -416,7 +416,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
   }
 
   @Override
-  public void setApplicationPriority(Priority priority) {
+  public synchronized void setApplicationPriority(Priority priority) {
     maybeInitBuilder();
     if (priority == null)
       builder.clearApplicationPriority();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index f2707ba..9772dc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -48,17 +48,21 @@ public abstract class TimelineClient extends AbstractService {
    * current user may use {@link UserGroupInformation#doAs} another user to
    * construct and initialize a timeline client if the following operations are
    * supposed to be conducted by that user.
-   *
-   * @return a timeline client
    */
   protected ApplicationId contextAppId;
 
+  /**
+   * Creates an instance of the timeline v.1.x client.
+   */
   @Public
   public static TimelineClient createTimelineClient() {
     TimelineClient client = new TimelineClientImpl();
     return client;
   }
 
+  /**
+   * Creates an instance of the timeline v.2 client.
+   */
   @Public
   public static TimelineClient createTimelineClient(ApplicationId appId) {
     TimelineClient client = new TimelineClientImpl(appId);
@@ -156,8 +160,9 @@ public abstract class TimelineClient extends AbstractService {
   /**
    * <p>
    * Send the information of a number of conceptual entities to the timeline
-   * aggregator. It is a blocking API. The method will not return until all the
-   * put entities have been persisted.
+   * service v.2 collector. It is a blocking API. The method will not return
+   * until all the put entities have been persisted. If this method is invoked
+   * for a non-v.2 timeline client instance, a YarnException is thrown.
    * </p>
    *
    * @param entities
@@ -173,8 +178,9 @@ public abstract class TimelineClient extends AbstractService {
   /**
    * <p>
    * Send the information of a number of conceptual entities to the timeline
-   * aggregator. It is an asynchronous API. The method will return once all the
-   * entities are received.
+   * service v.2 collector. It is an asynchronous API. The method will return
+   * once all the entities are received. If this method is invoked for a
+   * non-v.2 timeline client instance, a YarnException is thrown.
    * </p>
    *
    * @param entities

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 8312b6d..3a624ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -124,7 +124,7 @@ public class TimelineClientImpl extends TimelineClient {
   private int maxServiceRetries;
   private long serviceRetryInterval;
 
-  private boolean newTimelineService = false;
+  private boolean timelineServiceV2 = false;
 
   @Private
   @VisibleForTesting
@@ -270,7 +270,7 @@ public class TimelineClientImpl extends TimelineClient {
 
   public TimelineClientImpl(ApplicationId applicationId) {
     super(TimelineClientImpl.class.getName(), applicationId);
-    this.newTimelineService = true;
+    this.timelineServiceV2 = true;
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
@@ -299,13 +299,13 @@ public class TimelineClientImpl extends TimelineClient {
         new TimelineURLConnectionFactory()), cc);
     TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
     // TODO need to cleanup filter retry later.
-    if (!newTimelineService) {
+    if (!timelineServiceV2) {
       client.addFilter(retryFilter);
     }
 
     // old version timeline service need to get address from configuration
     // while new version need to auto discovery (with retry).
-    if (newTimelineService) {
+    if (timelineServiceV2) {
       maxServiceRetries = conf.getInt(
           YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
           YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
@@ -353,6 +353,9 @@ public class TimelineClientImpl extends TimelineClient {
   private void putEntities(boolean async,
       org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
       throws IOException, YarnException {
+    if (!timelineServiceV2) {
+      throw new YarnException("v.2 method is invoked on a v.1.x client");
+    }
     org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
         entitiesContainer =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 99f84e6..a9adbbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -771,14 +771,15 @@
   <property>
     <description>The setting that controls whether yarn system metrics is
     published to the Timeline server (version one) or not, by RM. 
-    This configuration is deprecated.</description>
+    This configuration is now deprecated in favor of
+    yarn.system-metrics-publisher.enabled.</description>
     <name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
     <value>false</value>
   </property>
 
   <property>
     <description>The setting that controls whether yarn system metrics is
-    published on the Timeline server (version two) or not by RM And NM.</description>
+    published on the Timeline service or not by RM And NM.</description>
     <name>yarn.system-metrics-publisher.enabled</name>
     <value>false</value>
   </property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index fa0cf5c..066abfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -261,10 +261,12 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private void initRegisteredCollectors() {
     NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
     List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList();
-    this.registeredCollectors = new HashMap<ApplicationId, String> ();
-    for (AppCollectorsMapProto c : list) {
-      ApplicationId appId = convertFromProtoFormat(c.getAppId());
-      this.registeredCollectors.put(appId, c.getAppCollectorAddr());
+    if (!list.isEmpty()) {
+      this.registeredCollectors = new HashMap<>();
+      for (AppCollectorsMapProto c : list) {
+        ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        this.registeredCollectors.put(appId, c.getAppCollectorAddr());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 2521b9c..151006b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -531,10 +531,12 @@ public class NodeHeartbeatResponsePBImpl extends
   private void initAppCollectorsMap() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     List<AppCollectorsMapProto> list = p.getAppCollectorsMapList();
-    this.appCollectorsMap = new HashMap<ApplicationId, String> ();
-    for (AppCollectorsMapProto c : list) {
-      ApplicationId appId = convertFromProtoFormat(c.getAppId());
-      this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
+    if (!list.isEmpty()) {
+      this.appCollectorsMap = new HashMap<>();
+      for (AppCollectorsMapProto c : list) {
+        ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 0b378a1..8fce422 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -64,7 +64,8 @@ public interface Context {
 
   /**
    * Get the registered collectors that located on this NM.
-   * @return registered
+   * @return registered collectors, or null if the timeline service v.2 is not
+   * enabled
    */
   Map<ApplicationId, String> getRegisteredCollectors();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 601bd04..da8a13a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -97,6 +97,7 @@ public class NodeManager extends CompositeService
   private Context context;
   private AsyncDispatcher dispatcher;
   private ContainerManagerImpl containerManager;
+  // the NM collector service is set only if the timeline service v.2 is enabled
   private NMCollectorService nmCollectorService;
   private NodeStatusUpdater nodeStatusUpdater;
   private NodeResourceMonitor nodeResourceMonitor;
@@ -356,8 +357,10 @@ public class NodeManager extends CompositeService
 
     DefaultMetricsSystem.initialize("NodeManager");
 
-    this.nmCollectorService = createNMCollectorService(context);
-    addService(nmCollectorService);
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      this.nmCollectorService = createNMCollectorService(context);
+      addService(nmCollectorService);
+    }
 
     // StatusUpdater should be added last so that it get started last 
     // so that we make sure everything is up before registering with RM. 
@@ -457,8 +460,7 @@ public class NodeManager extends CompositeService
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();
 
-    protected Map<ApplicationId, String> registeredCollectors =
-        new ConcurrentHashMap<ApplicationId, String>();
+    protected Map<ApplicationId, String> registeredCollectors;
 
     protected final ConcurrentMap<ContainerId,
         org.apache.hadoop.yarn.api.records.Container> increasedContainers =
@@ -484,6 +486,9 @@ public class NodeManager extends CompositeService
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
         NMStateStoreService stateStore, Configuration conf) {
+      if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+        this.registeredCollectors = new ConcurrentHashMap<>();
+      }
       this.containerTokenSecretManager = containerTokenSecretManager;
       this.nmTokenSecretManager = nmTokenSecretManager;
       this.dirsHandler = dirsHandler;
@@ -709,7 +714,14 @@ public class NodeManager extends CompositeService
     return this.context;
   }
 
-  // For testing
+  /**
+   * Returns the NM collector service. It should be used only for testing
+   * purposes.
+   *
+   * @return the NM collector service, or null if the timeline service v.2 is
+   * not enabled
+   */
+  @VisibleForTesting
   NMCollectorService getNMCollectorService() {
     return this.nmCollectorService;
   }
@@ -717,6 +729,7 @@ public class NodeManager extends CompositeService
   public static void main(String[] args) throws IOException {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
+    @SuppressWarnings("resource")
     NodeManager nodeManager = new NodeManager();
     Configuration conf = new YarnConfiguration();
     new GenericOptionsParser(conf, args);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index d39204f..39c846c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -821,7 +821,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
               dispatcher.getEventHandler().handle(
                   new CMgrSignalContainersEvent(containersToSignal));
             }
-            if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
+            if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
               updateTimelineClientsAddress(response);
             }
 
@@ -853,7 +853,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       /**
        * Caller should take care of sending non null nodelabels for both
        * arguments
-       * 
+       *
        * @param nodeLabelsNew
        * @param nodeLabelsOld
        * @return if the New node labels are diff from the older one.
@@ -869,27 +869,37 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
       private void updateTimelineClientsAddress(
           NodeHeartbeatResponse response) {
-        Set<Map.Entry<ApplicationId, String>> rmKnownCollectors = 
-            response.getAppCollectorsMap().entrySet();
-        for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
-          ApplicationId appId = entry.getKey();
-          String collectorAddr = entry.getValue();
-
-          // Only handle applications running on local node.
-          // Not include apps with timeline collectors running in local
-          Application application = context.getApplications().get(appId);
-          if (application != null &&
-              !context.getRegisteredCollectors().containsKey(appId)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Sync a new collector address: " + collectorAddr + 
-                  " for application: " + appId + " from RM.");
+        Map<ApplicationId, String> knownCollectorsMap =
+            response.getAppCollectorsMap();
+        if (knownCollectorsMap == null) {
+          LOG.warn("the collectors map is null");
+        } else {
+          Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
+              knownCollectorsMap.entrySet();
+          for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
+            ApplicationId appId = entry.getKey();
+            String collectorAddr = entry.getValue();
+
+            // Only handle applications running on local node.
+            // Not include apps with timeline collectors running in local
+            Application application = context.getApplications().get(appId);
+            // TODO this logic could be problematic if the collector address
+            // gets updated due to NM restart or collector service failure
+            if (application != null &&
+                !context.getRegisteredCollectors().containsKey(appId)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sync a new collector address: " + collectorAddr +
+                    " for application: " + appId + " from RM.");
+              }
+              TimelineClient client = application.getTimelineClient();
+              if (client != null) {
+                client.setTimelineServiceAddress(collectorAddr);
+              }
             }
-            TimelineClient client = application.getTimelineClient();
-            client.setTimelineServiceAddress(collectorAddr);
           }
         }
       }
-      
+
       private void updateMasterKeys(NodeHeartbeatResponse response) {
         // See if the master-key has rolled over
         MasterKey updatedMasterKey = response.getContainerTokenMasterKey();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
index db79ee5..3ba81ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -43,6 +43,10 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 
+/**
+ * Service that handles collector information. It is used only if the timeline
+ * service v.2 is enabled.
+ */
 public class NMCollectorService extends CompositeService implements
     CollectorNodemanagerProtocol {
 
@@ -113,9 +117,9 @@ public class NMCollectorService extends CompositeService implements
         String collectorAddr = collector.getCollectorAddr();
         newCollectorsMap.put(appId, collectorAddr);
         // set registered collector address to TimelineClient.
-        if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
-          TimelineClient client = 
-              context.getApplications().get(appId).getTimelineClient();
+        TimelineClient client =
+            context.getApplications().get(appId).getTimelineClient();
+        if (client != null) {
           client.setTimelineServiceAddress(collectorAddr);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index b010eee..e4668c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -115,6 +115,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl.FlowContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
@@ -190,7 +191,8 @@ public class ContainerManagerImpl extends CompositeService implements
 
   private long waitForContainersOnShutdownMillis;
 
-  private final NMTimelinePublisher nmMetricsPublisher;
+  // NM metrics publisher is set only if the timeline service v.2 is enabled
+  private NMTimelinePublisher nmMetricsPublisher;
 
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@@ -218,8 +220,15 @@ public class ContainerManagerImpl extends CompositeService implements
     auxiliaryServices.registerServiceListener(this);
     addService(auxiliaryServices);
 
-    nmMetricsPublisher = createNMTimelinePublisher(context);
-    context.setNMTimelinePublisher(nmMetricsPublisher);
+    // initialize the metrics publisher if the timeline service v.2 is enabled
+    // and the system publisher is enabled
+    Configuration conf = context.getConf();
+    if (YarnConfiguration.timelineServiceV2Enabled(conf) &&
+        YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
+      LOG.info("YARN system metrics publishing service is enabled");
+      nmMetricsPublisher = createNMTimelinePublisher(context);
+      context.setNMTimelinePublisher(nmMetricsPublisher);
+    }
     this.containersMonitor =
         new ContainersMonitorImpl(exec, dispatcher, this.context);
     addService(this.containersMonitor);
@@ -237,7 +246,6 @@ public class ContainerManagerImpl extends CompositeService implements
     
     addService(dispatcher);
 
-
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -335,7 +343,7 @@ public class ContainerManagerImpl extends CompositeService implements
     LOG.info("Recovering application " + appId);
     //TODO: Recover flow and flow run ID
     ApplicationImpl app = new ApplicationImpl(
-        dispatcher, p.getUser(), null, null, 0, appId, creds, context);
+        dispatcher, p.getUser(), appId, creds, context);
     context.getApplications().put(appId, app);
     app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }
@@ -941,20 +949,27 @@ public class ContainerManagerImpl extends CompositeService implements
     try {
       if (!serviceStopped) {
         // Create the application
-        String flowName = launchContext.getEnvironment().get(
-            TimelineUtils.FLOW_NAME_TAG_PREFIX);
-        String flowVersion = launchContext.getEnvironment().get(
-            TimelineUtils.FLOW_VERSION_TAG_PREFIX);
-        String flowRunIdStr = launchContext.getEnvironment().get(
-            TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
-        long flowRunId = 0L;
-        if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
-          flowRunId = Long.parseLong(flowRunIdStr);
+        // populate the flow context from the launch context if the timeline
+        // service v.2 is enabled
+        FlowContext flowContext = null;
+        if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+          String flowName = launchContext.getEnvironment().get(
+              TimelineUtils.FLOW_NAME_TAG_PREFIX);
+          String flowVersion = launchContext.getEnvironment().get(
+              TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+          String flowRunIdStr = launchContext.getEnvironment().get(
+              TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+          long flowRunId = 0L;
+          if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+            flowRunId = Long.parseLong(flowRunIdStr);
+          }
+          flowContext =
+              new FlowContext(flowName, flowVersion, flowRunId);
         }
         if (!context.getApplications().containsKey(applicationID)) {
           Application application =
-              new ApplicationImpl(dispatcher, user, flowName, flowVersion,
-                  flowRunId, applicationID, credentials, context);
+              new ApplicationImpl(dispatcher, user, flowContext,
+                  applicationID, credentials, context);
           if (context.getApplications().putIfAbsent(applicationID,
               application) == null) {
             LOG.info("Creating a new application reference for app "
@@ -1310,7 +1325,9 @@ public class ContainerManagerImpl extends CompositeService implements
       Container c = containers.get(event.getContainerID());
       if (c != null) {
         c.handle(event);
-        nmMetricsPublisher.publishContainerEvent(event);
+        if (nmMetricsPublisher != null) {
+          nmMetricsPublisher.publishContainerEvent(event);
+        }
       } else {
         LOG.warn("Event " + event + " sent to absent container " +
             event.getContainerID());
@@ -1326,7 +1343,9 @@ public class ContainerManagerImpl extends CompositeService implements
               event.getApplicationID());
       if (app != null) {
         app.handle(event);
-        nmMetricsPublisher.publishApplicationEvent(event);
+        if (nmMetricsPublisher != null) {
+          nmMetricsPublisher.publishApplicationEvent(event);
+        }
       } else {
         LOG.warn("Event " + event + " sent to absent application "
             + event.getApplicationID());
@@ -1349,7 +1368,9 @@ public class ContainerManagerImpl extends CompositeService implements
     @Override
     public void handle(LocalizationEvent event) {
       origLocalizationEventHandler.handle(event);
-      timelinePublisher.publishLocalizationEvent(event);
+      if (timelinePublisher != null) {
+        timelinePublisher.publishLocalizationEvent(event);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 6e87cfd..93c6758 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -67,9 +67,8 @@ public class ApplicationImpl implements Application {
 
   final Dispatcher dispatcher;
   final String user;
-  final String flowName;
-  final String flowVersion;
-  final long flowRunId;
+  // flow context is set only if the timeline service v.2 is enabled
+  private FlowContext flowContext;
   final ApplicationId appId;
   final Credentials credentials;
   Map<ApplicationAccessType, String> applicationACLs;
@@ -86,14 +85,16 @@ public class ApplicationImpl implements Application {
   Map<ContainerId, Container> containers =
       new HashMap<ContainerId, Container>();
 
-  public ApplicationImpl(Dispatcher dispatcher, String user, String flowName,
-      String flowVersion, long flowRunId, ApplicationId appId,
-      Credentials credentials, Context context) {
+  public ApplicationImpl(Dispatcher dispatcher, String user,
+      ApplicationId appId, Credentials credentials, Context context) {
+    this(dispatcher, user, null, appId, credentials, context);
+  }
+
+  public ApplicationImpl(Dispatcher dispatcher, String user,
+      FlowContext flowContext, ApplicationId appId, Credentials credentials,
+      Context context) {
     this.dispatcher = dispatcher;
     this.user = user;
-    this.flowName = flowName;
-    this.flowVersion = flowVersion;
-    this.flowRunId = flowRunId;
     this.appId = appId;
     this.credentials = credentials;
     this.aclsManager = context.getApplicationACLsManager();
@@ -103,11 +104,44 @@ public class ApplicationImpl implements Application {
     writeLock = lock.writeLock();
     stateMachine = stateMachineFactory.make(this);
     Configuration conf = context.getConf();
-    if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
-      createAndStartTimelineClient(conf);
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      if (flowContext == null) {
+        throw new IllegalArgumentException("flow context cannot be null");
+      }
+      this.flowContext = flowContext;
+      if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
+        createAndStartTimelineClient(conf);
+      }
     }
   }
-  
+
+  /**
+   * Data object that encapsulates the flow context for the application purpose.
+   */
+  public static class FlowContext {
+    private final String flowName;
+    private final String flowVersion;
+    private final long flowRunId;
+
+    public FlowContext(String flowName, String flowVersion, long flowRunId) {
+      this.flowName = flowName;
+      this.flowVersion = flowVersion;
+      this.flowRunId = flowRunId;
+    }
+
+    public String getFlowName() {
+      return flowName;
+    }
+
+    public String getFlowVersion() {
+      return flowVersion;
+    }
+
+    public long getFlowRunId() {
+      return flowRunId;
+    }
+  }
+
   private void createAndStartTimelineClient(Configuration conf) {
     // create and start timeline client
     this.timelineClient = TimelineClient.createTimelineClient(appId);
@@ -454,7 +488,11 @@ public class ApplicationImpl implements Application {
       // Remove collectors info for finished apps.
       // TODO check we remove related collectors info in failure cases
       // (YARN-3038)
-      app.context.getRegisteredCollectors().remove(app.getAppId());
+      Map<ApplicationId, String> registeredCollectors =
+          app.context.getRegisteredCollectors();
+      if (registeredCollectors != null) {
+        registeredCollectors.remove(app.getAppId());
+      }
       // stop timelineClient when application get finished.
       TimelineClient timelineClient = app.getTimelineClient();
       if (timelineClient != null) {
@@ -521,16 +559,16 @@ public class ApplicationImpl implements Application {
 
   @Override
   public String getFlowName() {
-    return flowName;
+    return flowContext == null ? null : flowContext.getFlowName();
   }
 
   @Override
   public String getFlowVersion() {
-    return flowVersion;
+    return flowContext == null ? null : flowContext.getFlowVersion();
   }
 
   @Override
   public long getFlowRunId() {
-    return flowRunId;
+    return flowContext == null ? 0L : flowContext.getFlowRunId();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index dfa32ac..589cf75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
@@ -559,9 +560,13 @@ public class ContainersMonitorImpl extends AbstractService implements
 
             ContainerImpl container =
                 (ContainerImpl) context.getContainers().get(containerId);
-            container.getNMTimelinePublisher().reportContainerResourceUsage(
-                container, currentTime, pId, currentPmemUsage,
-                cpuUsageTotalCoresPercentage);
+            NMTimelinePublisher nmMetricsPublisher =
+                container.getNMTimelinePublisher();
+            if (nmMetricsPublisher != null) {
+              nmMetricsPublisher.reportContainerResourceUsage(
+                  container, currentTime, pId, currentPmemUsage,
+                  cpuUsageTotalCoresPercentage);
+            }
           } catch (Exception e) {
             // Log the exception and proceed to the next container.
             LOG.warn("Uncaught exception in ContainersMonitorImpl "