You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/06/13 20:33:18 UTC

[1/2] hadoop git commit: YARN-3044. Made RM write app, attempt and optional container lifecycle events to timeline service v2. Contributed by Naganarasimha G R.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 0a3c14782 -> 17842a3f6


http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
new file mode 100644
index 0000000..9830a80
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher.MultiThreadedDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSystemMetricsPublisherForV2 {
+
+  /**
+   * is the folder where the FileSystemTimelineWriterImpl writes the entities
+   */
+  protected static File testRootDir = new File("target",
+      TestSystemMetricsPublisherForV2.class.getName() + "-localDir")
+      .getAbsoluteFile();
+
+  private static SystemMetricsPublisher metricsPublisher;
+  private static DrainDispatcher dispatcher = new DrainDispatcher();
+  private static final String DEFAULT_FLOW_VERSION = "1";
+  private static final long DEFAULT_FLOW_RUN = 1;
+
+  private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
+
+  private static RMTimelineCollectorManager rmTimelineCollectorManager;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    if (testRootDir.exists()) {
+      //cleanup before hand
+      FileContext.getLocalFSFileContext().delete(
+          new Path(testRootDir.getAbsolutePath()), true);
+    }
+
+    RMContext rmContext = mock(RMContext.class);
+    rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
+    when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
+    rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext);
+    when(rmContext.getRMTimelineCollectorManager()).thenReturn(
+        rmTimelineCollectorManager);
+
+    Configuration conf = getTimelineV2Conf();
+    rmTimelineCollectorManager.init(conf);
+    rmTimelineCollectorManager.start();
+
+    metricsPublisher = new SystemMetricsPublisher(rmContext) {
+      @Override
+      Dispatcher createDispatcher(
+          TimelineServicePublisher timelineServicePublisher) {
+        return dispatcher;
+      }
+    };
+    metricsPublisher.init(conf);
+    metricsPublisher.start();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (testRootDir.exists()) {
+      FileContext.getLocalFSFileContext().delete(
+          new Path(testRootDir.getAbsolutePath()), true);
+    }
+    if (rmTimelineCollectorManager != null) {
+      rmTimelineCollectorManager.stop();
+    }
+    if (metricsPublisher != null) {
+      metricsPublisher.stop();
+    }
+  }
+
+  private static Configuration getTimelineV2Conf() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2);
+    conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
+        true);
+    try {
+      conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+          testRootDir.getCanonicalPath());
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert
+          .fail("Exception while setting the TIMELINE_SERVICE_STORAGE_DIR_ROOT ");
+    }
+    return conf;
+  }
+
+  @Test
+  public void testSystemMetricPublisherInitialization() {
+    @SuppressWarnings("resource")
+    SystemMetricsPublisher metricsPublisher =
+        new SystemMetricsPublisher(mock(RMContext.class));
+    try {
+      Configuration conf = getTimelineV2Conf();
+      conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
+          false);
+      metricsPublisher.init(conf);
+      assertFalse(
+          "Default configuration should not publish container Metrics from RM",
+          metricsPublisher.isPublishContainerMetrics());
+
+      metricsPublisher.stop();
+
+      metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
+      conf = getTimelineV2Conf();
+      metricsPublisher.init(conf);
+      assertTrue("Expected to publish container Metrics from RM",
+          metricsPublisher.isPublishContainerMetrics());
+      assertTrue(
+          "MultiThreadedDispatcher expected when container Metrics is not published",
+          metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher);
+    } finally {
+      metricsPublisher.stop();
+    }
+  }
+
+  @Test(timeout = 1000000)
+  public void testPublishApplicationMetrics() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    RMApp app = createAppAndRegister(appId);
+
+    metricsPublisher.appCreated(app, app.getStartTime());
+    metricsPublisher.appACLsUpdated(app, "user1,user2", 4L);
+    metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
+    dispatcher.await();
+
+    String outputDirApp =
+        getTimelineEntityDir(app) + "/" + TimelineEntityType.YARN_APPLICATION
+            + "/";
+
+    File entityFolder = new File(outputDirApp);
+    Assert.assertTrue(entityFolder.isDirectory());
+
+    // file name is <entityId>.thist
+    String timelineServiceFileName =
+        appId.toString()
+            + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+    File appFile = new File(outputDirApp, timelineServiceFileName);
+    Assert.assertTrue(appFile.exists());
+    Assert.assertEquals("Expected 3 events to be published", 3,
+        getNumOfNonEmptyLines(appFile));
+  }
+
+  @Test(timeout = 10000)
+  public void testPublishAppAttemptMetrics() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    RMApp app = rmAppsMapInContext.get(appId);
+    if (app == null) {
+      app = createAppAndRegister(appId);
+    }
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
+    metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
+    when(app.getFinalApplicationStatus()).thenReturn(
+        FinalApplicationStatus.UNDEFINED);
+    metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
+        app, Integer.MAX_VALUE + 2L);
+
+    dispatcher.await();
+
+    String outputDirApp =
+        getTimelineEntityDir(app) + "/"
+            + TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/";
+
+    File entityFolder = new File(outputDirApp);
+    Assert.assertTrue(entityFolder.isDirectory());
+
+    // file name is <entityId>.thist
+    String timelineServiceFileName =
+        appAttemptId.toString()
+            + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+    File appFile = new File(outputDirApp, timelineServiceFileName);
+    Assert.assertTrue(appFile.exists());
+    Assert.assertEquals("Expected 2 events to be published", 2,
+        getNumOfNonEmptyLines(appFile));
+  }
+
+  @Test(timeout = 10000)
+  public void testPublishContainerMetrics() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    RMApp app = rmAppsMapInContext.get(appId);
+    if (app == null) {
+      app = createAppAndRegister(appId);
+    }
+    ContainerId containerId =
+        ContainerId.newContainerId(ApplicationAttemptId.newInstance(
+            appId, 1), 1);
+    RMContainer container = createRMContainer(containerId);
+    metricsPublisher.containerCreated(container, container.getCreationTime());
+    metricsPublisher.containerFinished(container, container.getFinishTime());
+    dispatcher.await();
+
+    String outputDirApp =
+        getTimelineEntityDir(app) + "/"
+            + TimelineEntityType.YARN_CONTAINER + "/";
+
+    File entityFolder = new File(outputDirApp);
+    Assert.assertTrue(entityFolder.isDirectory());
+
+    // file name is <entityId>.thist
+    String timelineServiceFileName =
+        containerId.toString()
+            + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+    File appFile = new File(outputDirApp, timelineServiceFileName);
+    Assert.assertTrue(appFile.exists());
+    Assert.assertEquals("Expected 2 events to be published", 2,
+        getNumOfNonEmptyLines(appFile));
+  }
+
+  private RMApp createAppAndRegister(ApplicationId appId) {
+    RMApp app = createRMApp(appId);
+
+    // some stuff which are currently taken care in RMAppImpl
+    rmAppsMapInContext.putIfAbsent(appId, app);
+    AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId);
+    rmTimelineCollectorManager.putIfAbsent(appId, collector);
+    return app;
+  }
+
+  private long getNumOfNonEmptyLines(File entityFile) throws IOException {
+    BufferedReader reader = null;
+    String strLine;
+    long count = 0;
+    try {
+      reader = new BufferedReader(new FileReader(entityFile));
+      while ((strLine = reader.readLine()) != null) {
+        if (strLine.trim().length() > 0)
+          count++;
+      }
+    } finally {
+      reader.close();
+    }
+    return count;
+  }
+
+  private String getTimelineEntityDir(RMApp app) {
+    String outputDirApp =
+        testRootDir.getAbsolutePath()+"/"
+            + FileSystemTimelineWriterImpl.ENTITIES_DIR
+            + "/"
+            + YarnConfiguration.DEFAULT_RM_CLUSTER_ID
+            + "/"
+            + app.getUser()
+            + "/"
+            + TimelineUtils.generateDefaultFlowIdBasedOnAppId(app
+                .getApplicationId()) + "/" + DEFAULT_FLOW_VERSION + "/"
+            + DEFAULT_FLOW_RUN + "/" + app.getApplicationId();
+    return outputDirApp;
+  }
+
+  private static RMApp createRMApp(ApplicationId appId) {
+    RMApp app = mock(RMAppImpl.class);
+    when(app.getApplicationId()).thenReturn(appId);
+    when(app.getName()).thenReturn("test app");
+    when(app.getApplicationType()).thenReturn("test app type");
+    when(app.getUser()).thenReturn("testUser");
+    when(app.getQueue()).thenReturn("test queue");
+    when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
+    when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
+    when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
+    when(app.getDiagnostics()).thenReturn(
+        new StringBuilder("test diagnostics info"));
+    RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+    when(appAttempt.getAppAttemptId()).thenReturn(
+        ApplicationAttemptId.newInstance(appId, 1));
+    when(app.getCurrentAppAttempt()).thenReturn(appAttempt);
+    when(app.getFinalApplicationStatus()).thenReturn(
+        FinalApplicationStatus.UNDEFINED);
+    when(app.getRMAppMetrics()).thenReturn(
+        new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE));
+    when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet());
+    return app;
+  }
+
+  private static RMAppAttempt createRMAppAttempt(
+      ApplicationAttemptId appAttemptId) {
+    RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+    when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
+    when(appAttempt.getHost()).thenReturn("test host");
+    when(appAttempt.getRpcPort()).thenReturn(-100);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(
+        ContainerId.newContainerId(appAttemptId, 1));
+    when(appAttempt.getMasterContainer()).thenReturn(container);
+    when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
+    when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
+    when(appAttempt.getOriginalTrackingUrl()).thenReturn(
+        "test original tracking url");
+    return appAttempt;
+  }
+
+  private static RMContainer createRMContainer(ContainerId containerId) {
+    RMContainer container = mock(RMContainer.class);
+    when(container.getContainerId()).thenReturn(containerId);
+    when(container.getAllocatedNode()).thenReturn(
+        NodeId.newInstance("test host", -100));
+    when(container.getAllocatedResource()).thenReturn(
+        Resource.newInstance(-1, -1));
+    when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
+    when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L);
+    when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L);
+    when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
+    when(container.getContainerExitStatus()).thenReturn(-1);
+    when(container.getContainerState()).thenReturn(ContainerState.COMPLETE);
+    Container mockContainer = mock(Container.class);
+    when(container.getContainer()).thenReturn(mockContainer);
+    when(mockContainer.getNodeHttpAddress())
+      .thenReturn("http://localhost:1234");
+    return container;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 1fc7651..2fff98d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -55,7 +55,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
   public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
       = "/tmp/timeline_service_data";
 
-  private static final String ENTITIES_DIR = "entities";
+  public static final String ENTITIES_DIR = "entities";
 
   /** Default extension for output files. */
   public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
@@ -76,7 +76,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
     return response;
   }
 
-  private void write(String clusterId, String userId, String flowName,
+  private synchronized void write(String clusterId, String userId, String flowName,
       String flowVersion, long flowRun, String appId, TimelineEntity entity,
       TimelineWriteResponse response) throws IOException {
     PrintWriter out = null;


[2/2] hadoop git commit: YARN-3044. Made RM write app, attempt and optional container lifecycle events to timeline service v2. Contributed by Naganarasimha G R.

Posted by zj...@apache.org.
YARN-3044. Made RM write app, attempt and optional container lifecycle events to timeline service v2. Contributed by Naganarasimha G R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/17842a3f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/17842a3f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/17842a3f

Branch: refs/heads/YARN-2928
Commit: 17842a3f61ed33ec831a889c11a74d4814be73ec
Parents: 0a3c147
Author: Zhijie Shen <zj...@apache.org>
Authored: Sat Jun 13 11:32:41 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Sat Jun 13 11:32:41 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../records/timelineservice/TimelineEntity.java |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  10 +
 .../distributedshell/TestDistributedShell.java  | 118 +++--
 .../metrics/ContainerMetricsConstants.java      |  10 +
 .../server/resourcemanager/ResourceManager.java |   2 +-
 .../AbstractTimelineServicePublisher.java       | 179 ++++++++
 .../metrics/ApplicationFinishedEvent.java       |  13 +-
 .../metrics/SystemMetricsPublisher.java         | 426 ++++---------------
 .../metrics/TimelineServiceV1Publisher.java     | 265 ++++++++++++
 .../metrics/TimelineServiceV2Publisher.java     | 296 +++++++++++++
 .../server/resourcemanager/rmapp/RMAppImpl.java |   2 -
 .../TestRMAppLogAggregationStatus.java          |   2 +-
 .../metrics/TestSystemMetricsPublisher.java     |   8 +-
 .../TestSystemMetricsPublisherForV2.java        | 374 ++++++++++++++++
 .../storage/FileSystemTimelineWriterImpl.java   |   4 +-
 16 files changed, 1321 insertions(+), 394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ab0dcb9..7b4b5d7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -82,6 +82,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3721. build is broken on YARN-2928 branch due to possible dependency
     cycle (Li Lu via sjlee)
 
+    YARN-3044. Made RM write app, attempt and optional container lifecycle
+    events to timeline service v2. (Naganarasimha G R via zjshen)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
index defadec..a641f32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -470,4 +470,7 @@ public class TimelineEntity {
     return real == null ? this : real;
   }
 
+  public String toString() {
+    return identifier.toString();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index e4ae2b7..2f7892c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -378,6 +378,16 @@ public class YarnConfiguration extends Configuration {
       + "system-metrics-publisher.enabled";
   public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
 
+  /**
+   * The setting that controls whether yarn container metrics is published to
+   * the timeline server or not by RM. This configuration setting is for ATS
+   * V2
+   */
+  public static final String RM_PUBLISH_CONTAINER_METRICS_ENABLED = YARN_PREFIX
+      + "rm.system-metrics-publisher.emit-container-events";
+  public static final boolean DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED =
+      false;
+
   public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
       RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
   public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index b887c77..7883f1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -52,13 +52,14 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -107,6 +108,7 @@ public class TestDistributedShell {
       conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
         + ".class", PerNodeTimelineCollectorsAuxService.class.getName());
     }
+    conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
     conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
     // Enable ContainersMonitorImpl
@@ -373,48 +375,96 @@ public class TestDistributedShell {
               "/1/1/" : "/test_flow_name/test_flow_version/12345678/") +
               appId.toString();
       // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
-      String outputDirApp = basePath + "/DS_APP_ATTEMPT/";
-
-      File entityFolder = new File(outputDirApp);
-      Assert.assertTrue(entityFolder.isDirectory());
 
+      // Verify DS_APP_ATTEMPT entities posted by the client
       // there will be at least one attempt, look for that file
-      String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
-          + "_000" + appId.getId() + "_000001"
-          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      String appAttemptFileName = outputDirApp + appTimestampFileName;
-      File appAttemptFile = new File(appAttemptFileName);
-      Assert.assertTrue(appAttemptFile.exists());
-
-      String outputDirContainer = basePath + "/DS_CONTAINER/";
-      File containerFolder = new File(outputDirContainer);
-      Assert.assertTrue(containerFolder.isDirectory());
-
-      String containerTimestampFileName = "container_"
-          + appId.getClusterTimestamp() + "_000" + appId.getId()
-          + "_01_000002.thist";
-      String containerFileName = outputDirContainer + containerTimestampFileName;
-      File containerFile = new File(containerFileName);
-      Assert.assertTrue(containerFile.exists());
+      String appTimestampFileName =
+          "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_000001"
+              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
+          appTimestampFileName);
+
+      // Verify DS_CONTAINER entities posted by the client
+      String containerTimestampFileName =
+          "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_01_000002.thist";
+      verifyEntityTypeFileExists(basePath, "DS_CONTAINER",
+          containerTimestampFileName);
 
       // Verify NM posting container metrics info.
-      String outputDirContainerMetrics = basePath + "/" + 
-          TimelineEntityType.YARN_CONTAINER + "/";
-      File containerMetricsFolder = new File(outputDirContainerMetrics);
-      Assert.assertTrue(containerMetricsFolder.isDirectory());
+      String containerMetricsTimestampFileName =
+          "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_01_000001"
+              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      verifyEntityTypeFileExists(basePath,
+          TimelineEntityType.YARN_CONTAINER.toString(),
+          containerMetricsTimestampFileName);
+
+      // Verify RM posting Application life cycle Events are getting published
+      String appMetricsTimestampFileName =
+          "application_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      File appEntityFile =
+          verifyEntityTypeFileExists(basePath,
+              TimelineEntityType.YARN_APPLICATION.toString(),
+              appMetricsTimestampFileName);
+      verifyStringExistsSpecifiedTimes(appEntityFile,
+          ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1,
+          "Application created event should be published atleast once");
+      verifyStringExistsSpecifiedTimes(appEntityFile,
+          ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1,
+          "Application finished event should be published atleast once");
+
+      // Verify RM posting AppAttempt life cycle Events are getting published
+      String appAttemptMetricsTimestampFileName =
+          "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+              + "_000001"
+              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      File appAttemptEntityFile =
+          verifyEntityTypeFileExists(basePath,
+              TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
+              appAttemptMetricsTimestampFileName);
+      verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
+          AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1,
+          "AppAttempt register event should be published atleast once");
+      verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
+          AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1,
+          "AppAttempt finished event should be published atleast once");
+    } finally {
+      FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+    }
+  }
 
-      String containerMetricsTimestampFileName = "container_"
-        + appId.getClusterTimestamp() + "_000" + appId.getId()
-        + "_01_000001.thist";
-      String containerMetricsFileName = outputDirContainerMetrics + 
-        containerMetricsTimestampFileName;
+  private File verifyEntityTypeFileExists(String basePath, String entityType,
+      String entityfileName) {
+    String outputDirPathForEntity = basePath + "/" + entityType + "/";
+    File outputDirForEntity = new File(outputDirPathForEntity);
+    Assert.assertTrue(outputDirForEntity.isDirectory());
 
-      File containerMetricsFile = new File(containerMetricsFileName);
-      Assert.assertTrue(containerMetricsFile.exists());
+    String entityFilePath = outputDirPathForEntity + entityfileName;
 
+    File entityFile = new File(entityFilePath);
+    Assert.assertTrue(entityFile.exists());
+    return entityFile;
+  }
+
+  private void verifyStringExistsSpecifiedTimes(File entityFile,
+      String searchString, long expectedNumOfTimes, String errorMsg)
+      throws IOException {
+    BufferedReader reader = null;
+    String strLine;
+    long actualCount = 0;
+    try {
+      reader = new BufferedReader(new FileReader(entityFile));
+      while ((strLine = reader.readLine()) != null) {
+        if (strLine.trim().contains(searchString))
+          actualCount++;
+      }
     } finally {
-      FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+      reader.close();
     }
+    Assert.assertEquals(errorMsg, expectedNumOfTimes, actualCount);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
index 0d5540d..7b42994 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
@@ -27,10 +27,20 @@ public class ContainerMetricsConstants {
 
   public static final String ENTITY_TYPE = "YARN_CONTAINER";
 
+  // Event of this type will be emitted by NM.
   public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED";
 
+  // Event of this type will be emitted by RM.
+  public static final String CREATED_IN_RM_EVENT_TYPE =
+      "YARN_RM_CONTAINER_CREATED";
+
+  // Event of this type will be emitted by NM.
   public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED";
 
+  // Event of this type will be emitted by RM.
+  public static final String FINISHED_IN_RM_EVENT_TYPE =
+      "YARN_RM_CONTAINER_FINISHED";
+
   public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT";
 
   public static final String ALLOCATED_MEMORY_ENTITY_INFO =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 0feb227..43c2021 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -360,7 +360,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
-    return new SystemMetricsPublisher(); 
+    return new SystemMetricsPublisher(rmContext);
   }
 
   // sanity check for configurations

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
new file mode 100644
index 0000000..0d66327
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher;
+
+public abstract class AbstractTimelineServicePublisher extends CompositeService
+    implements TimelineServicePublisher, EventHandler<SystemMetricsEvent> {
+
+  private static final Log LOG = LogFactory
+      .getLog(TimelineServiceV2Publisher.class);
+
+  private Configuration conf;
+
+  public AbstractTimelineServicePublisher(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.conf = conf;
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  @Override
+  public void handle(SystemMetricsEvent event) {
+    switch (event.getType()) {
+    case APP_CREATED:
+      publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
+      break;
+    case APP_FINISHED:
+      publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
+      break;
+    case APP_ACLS_UPDATED:
+      publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
+      break;
+    case APP_ATTEMPT_REGISTERED:
+      publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
+      break;
+    case APP_ATTEMPT_FINISHED:
+      publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
+      break;
+    case CONTAINER_CREATED:
+      publishContainerCreatedEvent((ContainerCreatedEvent) event);
+      break;
+    case CONTAINER_FINISHED:
+      publishContainerFinishedEvent((ContainerFinishedEvent) event);
+      break;
+    default:
+      LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
+    }
+  }
+
+  abstract void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event);
+
+  abstract void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event);
+
+  abstract void publishApplicationACLsUpdatedEvent(
+      ApplicationACLsUpdatedEvent event);
+
+  abstract void publishApplicationFinishedEvent(ApplicationFinishedEvent event);
+
+  abstract void publishApplicationCreatedEvent(ApplicationCreatedEvent event);
+
+  abstract void publishContainerCreatedEvent(ContainerCreatedEvent event);
+
+  abstract void publishContainerFinishedEvent(ContainerFinishedEvent event);
+
+  @Override
+  public Dispatcher getDispatcher() {
+    MultiThreadedDispatcher dispatcher =
+        new MultiThreadedDispatcher(
+            conf.getInt(
+                YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+                YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
+    dispatcher.setDrainEventsOnStop();
+    return dispatcher;
+  }
+
+  @Override
+  public boolean publishRMContainerMetrics() {
+    return true;
+  }
+
+  @Override
+  public EventHandler<SystemMetricsEvent> getEventHandler() {
+    return this;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static class MultiThreadedDispatcher extends CompositeService
+      implements Dispatcher {
+
+    private List<AsyncDispatcher> dispatchers =
+        new ArrayList<AsyncDispatcher>();
+
+    public MultiThreadedDispatcher(int num) {
+      super(MultiThreadedDispatcher.class.getName());
+      for (int i = 0; i < num; ++i) {
+        AsyncDispatcher dispatcher = createDispatcher();
+        dispatchers.add(dispatcher);
+        addIfService(dispatcher);
+      }
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return new CompositEventHandler();
+    }
+
+    @Override
+    public void register(Class<? extends Enum> eventType, EventHandler handler) {
+      for (AsyncDispatcher dispatcher : dispatchers) {
+        dispatcher.register(eventType, handler);
+      }
+    }
+
+    public void setDrainEventsOnStop() {
+      for (AsyncDispatcher dispatcher : dispatchers) {
+        dispatcher.setDrainEventsOnStop();
+      }
+    }
+
+    private class CompositEventHandler implements EventHandler<Event> {
+
+      @Override
+      public void handle(Event event) {
+        // Use hashCode (of ApplicationId) to dispatch the event to the child
+        // dispatcher, such that all the writing events of one application will
+        // be handled by one thread, the scheduled order of the these events
+        // will be preserved
+        int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
+        dispatchers.get(index).getEventHandler().handle(event);
+      }
+    }
+
+    protected AsyncDispatcher createDispatcher() {
+      return new AsyncDispatcher();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
index 8d75f92..d9241b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 
 public class ApplicationFinishedEvent extends
@@ -33,6 +35,7 @@ public class ApplicationFinishedEvent extends
   private YarnApplicationState state;
   private ApplicationAttemptId latestAppAttemptId;
   private RMAppMetrics appMetrics;
+  private RMAppImpl app;
 
   public ApplicationFinishedEvent(
       ApplicationId appId,
@@ -41,14 +44,16 @@ public class ApplicationFinishedEvent extends
       YarnApplicationState state,
       ApplicationAttemptId latestAppAttemptId,
       long finishedTime,
-      RMAppMetrics appMetrics) {
+      RMAppMetrics appMetrics,
+      RMAppImpl app) {
     super(SystemMetricsEventType.APP_FINISHED, finishedTime);
     this.appId = appId;
     this.diagnosticsInfo = diagnosticsInfo;
     this.appStatus = appStatus;
     this.latestAppAttemptId = latestAppAttemptId;
     this.state = state;
-    this.appMetrics=appMetrics;
+    this.appMetrics = appMetrics;
+    this.app = app;
   }
 
   @Override
@@ -56,6 +61,10 @@ public class ApplicationFinishedEvent extends
     return appId.hashCode();
   }
 
+  public RMAppImpl getApp() {
+    return app;
+  }
+
   public ApplicationId getApplicationId() {
     return appId;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
index 63461b5..629636b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
@@ -18,10 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.metrics;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,31 +25,23 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
- * The class that helps RM publish metrics to the timeline server V1. RM will
+ * The class that helps RM publish metrics to the timeline server. RM will
  * always invoke the methods of this class regardless the service is enabled or
  * not. If it is disabled, publishing requests will be ignored silently.
  */
@@ -65,30 +53,38 @@ public class SystemMetricsPublisher extends CompositeService {
       .getLog(SystemMetricsPublisher.class);
 
   private Dispatcher dispatcher;
-  private TimelineClient client;
-  private boolean publishSystemMetricsToATSv1;
+  private boolean publishSystemMetrics;
+  private boolean publishContainerMetrics;
+  protected RMContext rmContext;
 
-  public SystemMetricsPublisher() {
+  public SystemMetricsPublisher(RMContext rmContext) {
     super(SystemMetricsPublisher.class.getName());
+    this.rmContext = rmContext;
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    publishSystemMetricsToATSv1 =
+    publishSystemMetrics =
         conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
-            && conf.getBoolean(
-                YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
-                YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
-
-    if (publishSystemMetricsToATSv1) {
-      client = TimelineClient.createTimelineClient();
-      addIfService(client);
-
-      dispatcher = createDispatcher(conf);
-      dispatcher.register(SystemMetricsEventType.class,
-          new ForwardingEventHandler());
-      addIfService(dispatcher);
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+    if (publishSystemMetrics) {
+      TimelineServicePublisher timelineServicePublisher =
+          getTimelineServicePublisher(conf);
+      if (timelineServicePublisher != null) {
+        addService(timelineServicePublisher);
+        // init required to be called so that other methods of
+        // TimelineServicePublisher can be utilized
+        timelineServicePublisher.init(conf);
+        dispatcher = createDispatcher(timelineServicePublisher);
+        publishContainerMetrics =
+            timelineServicePublisher.publishRMContainerMetrics();
+        dispatcher.register(SystemMetricsEventType.class,
+            timelineServicePublisher.getEventHandler());
+        addIfService(dispatcher);
+      } else {
+        LOG.info("TimelineServicePublisher is not configured");
+        publishSystemMetrics = false;
+      }
       LOG.info("YARN system metrics publishing service is enabled");
     } else {
       LOG.info("YARN system metrics publishing service is not enabled");
@@ -96,9 +92,26 @@ public class SystemMetricsPublisher extends CompositeService {
     super.serviceInit(conf);
   }
 
+  @VisibleForTesting
+  Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) {
+    return timelineServicePublisher.getDispatcher();
+  }
+
+  TimelineServicePublisher getTimelineServicePublisher(Configuration conf) {
+    if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+        YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
+      return new TimelineServiceV1Publisher();
+    } else if (conf.getBoolean(
+        YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+        YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
+      return new TimelineServiceV2Publisher(rmContext);
+    }
+    return null;
+  }
+
   @SuppressWarnings("unchecked")
   public void appCreated(RMApp app, long createdTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new ApplicationCreatedEvent(
               app.getApplicationId(),
@@ -113,7 +126,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void appFinished(RMApp app, RMAppState state, long finishedTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new ApplicationFinishedEvent(
               app.getApplicationId(),
@@ -123,14 +136,15 @@ public class SystemMetricsPublisher extends CompositeService {
               app.getCurrentAppAttempt() == null ?
                   null : app.getCurrentAppAttempt().getAppAttemptId(),
               finishedTime,
-              app.getRMAppMetrics()));
+              app.getRMAppMetrics(),
+              (RMAppImpl)app));
     }
   }
 
   @SuppressWarnings("unchecked")
   public void appACLsUpdated(RMApp app, String appViewACLs,
       long updatedTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new ApplicationACLsUpdatedEvent(
               app.getApplicationId(),
@@ -142,7 +156,7 @@ public class SystemMetricsPublisher extends CompositeService {
   @SuppressWarnings("unchecked")
   public void appAttemptRegistered(RMAppAttempt appAttempt,
       long registeredTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new AppAttemptRegisteredEvent(
               appAttempt.getAppAttemptId(),
@@ -158,7 +172,7 @@ public class SystemMetricsPublisher extends CompositeService {
   @SuppressWarnings("unchecked")
   public void appAttemptFinished(RMAppAttempt appAttempt,
       RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new AppAttemptFinishedEvent(
               appAttempt.getAppAttemptId(),
@@ -175,7 +189,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void containerCreated(RMContainer container, long createdTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishContainerMetrics) {
       dispatcher.getEventHandler().handle(
           new ContainerCreatedEvent(
               container.getContainerId(),
@@ -188,7 +202,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void containerFinished(RMContainer container, long finishedTime) {
-    if (publishSystemMetricsToATSv1) {
+    if (publishContainerMetrics) {
       dispatcher.getEventHandler().handle(
           new ContainerFinishedEvent(
               container.getContainerId(),
@@ -199,317 +213,31 @@ public class SystemMetricsPublisher extends CompositeService {
     }
   }
 
-  protected Dispatcher createDispatcher(Configuration conf) {
-    MultiThreadedDispatcher dispatcher =
-        new MultiThreadedDispatcher(
-            conf.getInt(
-                YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
-                YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
-    dispatcher.setDrainEventsOnStop();
-    return dispatcher;
-  }
-
-  protected void handleSystemMetricsEvent(
-      SystemMetricsEvent event) {
-    switch (event.getType()) {
-      case APP_CREATED:
-        publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
-        break;
-      case APP_FINISHED:
-        publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
-        break;
-      case APP_ACLS_UPDATED:
-        publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
-        break;
-      case APP_ATTEMPT_REGISTERED:
-        publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
-        break;
-      case APP_ATTEMPT_FINISHED:
-        publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
-        break;
-      case CONTAINER_CREATED:
-        publishContainerCreatedEvent((ContainerCreatedEvent) event);
-        break;
-      case CONTAINER_FINISHED:
-        publishContainerFinishedEvent((ContainerFinishedEvent) event);
-        break;
-      default:
-        LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
-    }
-  }
-
-  private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
-    TimelineEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
-        event.getApplicationName());
-    entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
-        event.getApplicationType());
-    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
-        event.getUser());
-    entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
-    entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
-        event.getSubmittedTime());
-    entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
-        event.getAppTags());
-    entity.setOtherInfo(entityInfo);
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(
-        ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    entity.addEvent(tEvent);
-    putEntity(entity);
+  @VisibleForTesting
+  boolean isPublishContainerMetrics() {
+    return publishContainerMetrics;
   }
 
-  private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
-    TimelineEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(
-        ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
-        event.getFinalApplicationStatus().toString());
-    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
-        event.getYarnApplicationState().toString());
-    if (event.getLatestApplicationAttemptId() != null) {
-      eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
-          event.getLatestApplicationAttemptId().toString());
-    }
-    RMAppMetrics appMetrics = event.getAppMetrics();
-    entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
-        appMetrics.getVcoreSeconds());
-    entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
-        appMetrics.getMemorySeconds());
-    
-    tEvent.setEventInfo(eventInfo);
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private void publishApplicationACLsUpdatedEvent(
-      ApplicationACLsUpdatedEvent event) {
-    TimelineEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    TimelineEvent tEvent = new TimelineEvent();
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
-        event.getViewAppACLs());
-    entity.setOtherInfo(entityInfo);
-    tEvent.setEventType(
-        ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private static TimelineEntity createApplicationEntity(
-      ApplicationId applicationId) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
-    entity.setEntityId(applicationId.toString());
-    return entity;
-  }
-
-  private void
-      publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
-    TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(
-        AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(
-        AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
-    eventInfo.put(
-        AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
-    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
-        event.getHost());
-    eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
-        event.getRpcPort());
-    eventInfo.put(
-        AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
-        event.getMasterContainerId().toString());
-    tEvent.setEventInfo(eventInfo);
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
-    TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(
-        AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
-    eventInfo.put(
-        AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
-    eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
-        event.getFinalApplicationStatus().toString());
-    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO,
-        event.getYarnApplicationAttemptState().toString());
-    tEvent.setEventInfo(eventInfo);
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private static TimelineEntity createAppAttemptEntity(
-      ApplicationAttemptId appAttemptId) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setEntityType(
-        AppAttemptMetricsConstants.ENTITY_TYPE);
-    entity.setEntityId(appAttemptId.toString());
-    entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
-        appAttemptId.getApplicationId().toString());
-    return entity;
-  }
-
-  private void publishContainerCreatedEvent(ContainerCreatedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
-        event.getAllocatedResource().getMemory());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
-        event.getAllocatedResource().getVirtualCores());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
-        event.getAllocatedNode().getHost());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
-        event.getAllocatedNode().getPort());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
-        event.getAllocatedPriority().getPriority());
-    entityInfo.put(
-      ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
-      event.getNodeHttpAddress());
-    entity.setOtherInfo(entityInfo);
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private void publishContainerFinishedEvent(ContainerFinishedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
-        event.getContainerExitStatus());
-    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
-        event.getContainerState().toString());
-    tEvent.setEventInfo(eventInfo);
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  private static TimelineEntity createContainerEntity(
-      ContainerId containerId) {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setEntityType(
-        ContainerMetricsConstants.ENTITY_TYPE);
-    entity.setEntityId(containerId.toString());
-    entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
-        containerId.getApplicationAttemptId().toString());
-    return entity;
-  }
-
-  private void putEntity(TimelineEntity entity) {
-    try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Publishing the entity " + entity.getEntityId() +
-            ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity));
-      }
-      client.putEntities(entity);
-    } catch (Exception e) {
-      LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
-          + entity.getEntityId() + "]", e);
-    }
+  @VisibleForTesting
+  Dispatcher getDispatcher() {
+    return dispatcher;
   }
 
-  /**
-   * EventHandler implementation which forward events to SystemMetricsPublisher.
-   * Making use of it, SystemMetricsPublisher can avoid to have a public handle
-   * method.
-   */
-  private final class ForwardingEventHandler implements
-      EventHandler<SystemMetricsEvent> {
+  interface TimelineServicePublisher extends Service {
+    /**
+     * @return the Dispatcher which needs to be used to dispatch events
+     */
+    Dispatcher getDispatcher();
 
-    @Override
-    public void handle(SystemMetricsEvent event) {
-      handleSystemMetricsEvent(event);
-    }
+    /**
+     * @return true if RMContainerMetricsNeeds to be sent
+     */
+    boolean publishRMContainerMetrics();
 
+    /**
+     * @return EventHandler which needs to be registered to the dispatcher to
+     *         handle the SystemMetricsEvent
+     */
+    EventHandler<SystemMetricsEvent> getEventHandler();
   }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  protected static class MultiThreadedDispatcher extends CompositeService
-      implements Dispatcher {
-
-    private List<AsyncDispatcher> dispatchers =
-        new ArrayList<AsyncDispatcher>();
-
-    public MultiThreadedDispatcher(int num) {
-      super(MultiThreadedDispatcher.class.getName());
-      for (int i = 0; i < num; ++i) {
-        AsyncDispatcher dispatcher = createDispatcher();
-        dispatchers.add(dispatcher);
-        addIfService(dispatcher);
-      }
-    }
-
-    @Override
-    public EventHandler getEventHandler() {
-      return new CompositEventHandler();
-    }
-
-    @Override
-    public void register(Class<? extends Enum> eventType, EventHandler handler) {
-      for (AsyncDispatcher dispatcher : dispatchers) {
-        dispatcher.register(eventType, handler);
-      }
-    }
-
-    public void setDrainEventsOnStop() {
-      for (AsyncDispatcher dispatcher : dispatchers) {
-        dispatcher.setDrainEventsOnStop();
-      }
-    }
-
-    private class CompositEventHandler implements EventHandler<Event> {
-
-      @Override
-      public void handle(Event event) {
-        // Use hashCode (of ApplicationId) to dispatch the event to the child
-        // dispatcher, such that all the writing events of one application will
-        // be handled by one thread, the scheduled order of the these events
-        // will be preserved
-        int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
-        dispatchers.get(index).getEventHandler().handle(event);
-      }
-
-    }
-
-    protected AsyncDispatcher createDispatcher() {
-      return new AsyncDispatcher();
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
new file mode 100644
index 0000000..c4a85b3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+public class TimelineServiceV1Publisher extends
+    AbstractTimelineServicePublisher {
+
+  private static final Log LOG = LogFactory
+      .getLog(TimelineServiceV1Publisher.class);
+
+  public TimelineServiceV1Publisher() {
+    super("TimelineserviceV1Publisher");
+  }
+
+  private TimelineClient client;
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    client = TimelineClient.createTimelineClient();
+    addIfService(client);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
+    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
+        event.getApplicationName());
+    entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
+        event.getApplicationType());
+    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
+        event.getUser());
+    entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
+        event.getQueue());
+    entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
+        event.getSubmittedTime());
+    entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
+        event.getAppTags());
+    entity.setOtherInfo(entityInfo);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  @Override
+  void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
+    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+        .getFinalApplicationStatus().toString());
+    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
+        .getYarnApplicationState().toString());
+    if (event.getLatestApplicationAttemptId() != null) {
+      eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
+          event.getLatestApplicationAttemptId().toString());
+    }
+    RMAppMetrics appMetrics = event.getAppMetrics();
+    entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
+        appMetrics.getVcoreSeconds());
+    entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
+        appMetrics.getMemorySeconds());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  @Override
+  void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
+    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
+        event.getViewAppACLs());
+    entity.setOtherInfo(entityInfo);
+    tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  private static TimelineEntity createApplicationEntity(
+      ApplicationId applicationId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
+    entity.setEntityId(applicationId.toString());
+    return entity;
+  }
+
+  @Override
+  void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+    TimelineEntity entity =
+        createAppAttemptEntity(event.getApplicationAttemptId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        event.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        event.getOriginalTrackingURL());
+    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost());
+    eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
+        event.getRpcPort());
+    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event
+        .getMasterContainerId().toString());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  @Override
+  void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+    TimelineEntity entity =
+        createAppAttemptEntity(event.getApplicationAttemptId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        event.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        event.getOriginalTrackingURL());
+    eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+        .getFinalApplicationStatus().toString());
+    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
+        .getYarnApplicationAttemptState().toString());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  private static TimelineEntity createAppAttemptEntity(
+      ApplicationAttemptId appAttemptId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE);
+    entity.setEntityId(appAttemptId.toString());
+    entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
+        appAttemptId.getApplicationId().toString());
+    return entity;
+  }
+
+  @Override
+  void publishContainerCreatedEvent(ContainerCreatedEvent event) {
+    TimelineEntity entity = createContainerEntity(event.getContainerId());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
+        event.getAllocatedResource().getMemory());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
+        .getAllocatedResource().getVirtualCores());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
+        .getAllocatedNode().getHost());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
+        .getAllocatedNode().getPort());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+        event.getAllocatedPriority().getPriority());
+    entityInfo.put(
+        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+        event.getNodeHttpAddress());
+    entity.setOtherInfo(entityInfo);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  @Override
+  void publishContainerFinishedEvent(ContainerFinishedEvent event) {
+    TimelineEntity entity = createContainerEntity(event.getContainerId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+        event.getContainerExitStatus());
+    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
+        .getContainerState().toString());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity);
+  }
+
+  private static TimelineEntity createContainerEntity(ContainerId containerId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE);
+    entity.setEntityId(containerId.toString());
+    entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
+        containerId.getApplicationAttemptId().toString());
+    return entity;
+  }
+
+  private void putEntity(TimelineEntity entity) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Publishing the entity " + entity.getEntityId()
+            + ", JSON-style content: "
+            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      }
+      client.putEntities(entity);
+    } catch (Exception e) {
+      LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
+          + entity.getEntityId() + "]", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
new file mode 100644
index 0000000..81ce54c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * This class is responsible for posting application, appattempt & Container
+ * lifecycle related events to timeline service V2
+ */
+@Private
+@Unstable
+public class TimelineServiceV2Publisher extends
+    AbstractTimelineServicePublisher {
+  private static final Log LOG = LogFactory
+      .getLog(TimelineServiceV2Publisher.class);
+  protected RMTimelineCollectorManager rmTimelineCollectorManager;
+
+  public TimelineServiceV2Publisher(RMContext rmContext) {
+    super("TimelineserviceV2Publisher");
+    rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
+  }
+
+  private boolean publishContainerMetrics;
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    publishContainerMetrics =
+        conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
+            YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
+    ApplicationEntity entity =
+        createApplicationEntity(event.getApplicationId());
+    entity.setQueue(event.getQueue());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
+        event.getApplicationName());
+    entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
+        event.getApplicationType());
+    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
+        event.getUser());
+    entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
+        event.getSubmittedTime());
+    entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
+        event.getAppTags());
+    entity.setInfo(entityInfo);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    entity.addEvent(tEvent);
+
+    putEntity(entity, event.getApplicationId());
+  }
+
+  @Override
+  void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
+    ApplicationEntity entity =
+        createApplicationEntity(event.getApplicationId());
+    RMAppMetrics appMetrics = event.getAppMetrics();
+    entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
+        appMetrics.getVcoreSeconds());
+    entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
+        appMetrics.getMemorySeconds());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+        .getFinalApplicationStatus().toString());
+    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
+        .getYarnApplicationState().toString());
+    if (event.getLatestApplicationAttemptId() != null) {
+      eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
+          event.getLatestApplicationAttemptId().toString());
+    }
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, event.getApplicationId());
+
+    //cleaning up the collector cached
+    event.getApp().stopTimelineCollector();
+  }
+
+  @Override
+  void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
+    ApplicationEntity entity =
+        createApplicationEntity(event.getApplicationId());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
+        event.getViewAppACLs());
+    entity.setInfo(entityInfo);
+
+    putEntity(entity, event.getApplicationId());
+  }
+
+  private static ApplicationEntity createApplicationEntity(
+      ApplicationId applicationId) {
+    ApplicationEntity entity = new ApplicationEntity();
+    entity.setId(applicationId.toString());
+    return entity;
+  }
+
+  @Override
+  void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+    TimelineEntity entity =
+        createAppAttemptEntity(event.getApplicationAttemptId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        event.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        event.getOriginalTrackingURL());
+    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost());
+    eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
+        event.getRpcPort());
+    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event
+        .getMasterContainerId().toString());
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+  }
+
+  @Override
+  void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+    ApplicationAttemptEntity entity =
+        createAppAttemptEntity(event.getApplicationAttemptId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        event.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        event.getOriginalTrackingURL());
+    eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+        .getFinalApplicationStatus().toString());
+    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
+        .getYarnApplicationAttemptState().toString());
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+  }
+
+  @Override
+  void publishContainerCreatedEvent(ContainerCreatedEvent event) {
+    TimelineEntity entity = createContainerEntity(event.getContainerId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    // updated as event info instead of entity info, as entity info is updated
+    // by NM
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event
+        .getAllocatedResource().getMemory());
+    eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
+        .getAllocatedResource().getVirtualCores());
+    eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
+        .getAllocatedNode().getHost());
+    eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
+        .getAllocatedNode().getPort());
+    eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+        event.getAllocatedPriority().getPriority());
+    eventInfo.put(
+        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+        event.getNodeHttpAddress());
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, event.getContainerId().getApplicationAttemptId()
+        .getApplicationId());
+  }
+
+  @Override
+  void publishContainerFinishedEvent(ContainerFinishedEvent event) {
+    TimelineEntity entity = createContainerEntity(event.getContainerId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        event.getDiagnosticsInfo());
+    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+        event.getContainerExitStatus());
+    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
+        .getContainerState().toString());
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, event.getContainerId().getApplicationAttemptId()
+        .getApplicationId());
+  }
+
+  private static ContainerEntity createContainerEntity(ContainerId containerId) {
+    ContainerEntity entity = new ContainerEntity();
+    entity.setId(containerId.toString());
+    entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION_ATTEMPT
+        .name(), containerId.getApplicationAttemptId().toString()));
+    return entity;
+  }
+
+  private void putEntity(TimelineEntity entity, ApplicationId appId) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      }
+      TimelineCollector timelineCollector =
+          rmTimelineCollectorManager.get(appId);
+      TimelineEntities entities = new TimelineEntities();
+      entities.addEntity(entity);
+      timelineCollector.putEntities(entities,
+          UserGroupInformation.getCurrentUser());
+    } catch (Exception e) {
+      LOG.error("Error when publishing entity " + entity, e);
+    }
+  }
+
+  private static ApplicationAttemptEntity createAppAttemptEntity(
+      ApplicationAttemptId appAttemptId) {
+    ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
+    entity.setId(appAttemptId.toString());
+    entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
+        appAttemptId.getApplicationId().toString()));
+    return entity;
+  }
+
+  @Override
+  public boolean publishRMContainerMetrics() {
+    return publishContainerMetrics;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index a68fc77..c2155da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1302,8 +1302,6 @@ public class RMAppImpl implements RMApp, Recoverable {
           .applicationFinished(app, finalState);
       app.rmContext.getSystemMetricsPublisher()
           .appFinished(app, finalState, app.finishTime);
-
-      app.stopTimelineCollector();
     };
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 6db1745..13c2ab3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -90,7 +90,7 @@ public class TestRMAppLogAggregationStatus {
         new RMContextImpl(rmDispatcher, null, null, null,
           null, null, null, null, null,
           new RMApplicationHistoryWriter());
-    rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
+    rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
 
     rmContext
         .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index b122bc4..41b7a8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -45,7 +45,9 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistor
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -71,7 +73,7 @@ public class TestSystemMetricsPublisher {
   public static void setup() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true);
     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
         MemoryTimelineStore.class, TimelineStore.class);
     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
@@ -85,7 +87,7 @@ public class TestSystemMetricsPublisher {
     timelineServer.start();
     store = timelineServer.getTimelineStore();
 
-    metricsPublisher = new SystemMetricsPublisher();
+    metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
     metricsPublisher.init(conf);
     metricsPublisher.start();
   }
@@ -339,7 +341,7 @@ public class TestSystemMetricsPublisher {
   }
 
   private static RMApp createRMApp(ApplicationId appId) {
-    RMApp app = mock(RMApp.class);
+    RMApp app = mock(RMAppImpl.class);
     when(app.getApplicationId()).thenReturn(appId);
     when(app.getName()).thenReturn("test app");
     when(app.getApplicationType()).thenReturn("test app type");