You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/03/29 00:55:28 UTC

hadoop git commit: YARN-4711. NM is going down with NPE's due to single thread processing of events by Timeline client (Naganarasimha G R via sjlee)

Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 bcbb52d8a -> f746c80b3


YARN-4711. NM is going down with NPE's due to single thread processing of events by Timeline client (Naganarasimha G R via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f746c80b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f746c80b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f746c80b

Branch: refs/heads/YARN-2928
Commit: f746c80b34e9acf2390c90eab2e8a4eb993e52b9
Parents: bcbb52d
Author: Sangjin Lee <sj...@apache.org>
Authored: Mon Mar 28 15:50:03 2016 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Mon Mar 28 15:50:03 2016 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../dev-support/findbugs-exclude.xml            |  11 +-
 .../records/timelineservice/TimelineEntity.java |  25 ++-
 .../client/api/impl/TimelineClientImpl.java     |  35 ++--
 .../api/impl/TestTimelineClientV2Impl.java      |  91 +++++++-
 .../metrics/ContainerMetricsConstants.java      |   8 +
 .../nodemanager/NodeStatusUpdaterImpl.java      |  10 +-
 .../collectormanager/NMCollectorService.java    |  10 +-
 .../application/Application.java                |   4 -
 .../application/ApplicationImpl.java            |  24 +--
 .../timelineservice/NMTimelinePublisher.java    | 210 +++++++++++--------
 .../TestNMTimelinePublisher.java                |  24 +--
 .../yarn/server/nodemanager/webapp/MockApp.java |   5 -
 13 files changed, 281 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ab4c706..d71f7fd 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -244,6 +244,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4712. CPU Usage Metric is not captured properly in YARN-2928.
     (Naganarasimha G R via varunsaxena)
 
+    YARN-4711. NM is going down with NPE's due to single thread processing of
+    events by Timeline client (Naganarasimha G R via sjlee)
+
 Trunk - Unreleased
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index bd6cca5..dbc5506 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -117,8 +117,15 @@
 
   <!-- Object cast is based on the event type -->
   <Match>
-    <Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher$ApplicationEventHandler" />
-     <Bug pattern="BC_UNCONFIRMED_CAST" />
+    <Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher" />
+    <Method name="publishApplicationEvent" />
+    <Bug pattern="BC_UNCONFIRMED_CAST" />
+  </Match>
+
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher" />
+    <Method name="publishLocalizationEvent" />
+    <Bug pattern="BC_UNCONFIRMED_CAST" />
   </Match>
 
   <Match>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
index acc132e..7ce8279 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -17,15 +17,6 @@
  */
 package org.apache.hadoop.yarn.api.records.timelineservice;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.util.TimelineServiceHelper;
-import org.codehaus.jackson.annotate.JsonSetter;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -33,6 +24,16 @@ import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
 
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.codehaus.jackson.annotate.JsonSetter;
+
 /**
  * The basic timeline entity data structure for timeline service v2. Timeline
  * entity objects are not thread safe and should not be accessed concurrently.
@@ -564,6 +565,10 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
   }
 
   public String toString() {
-    return identifier.toString();
+    if (real == null) {
+      return identifier.toString();
+    } else {
+      return real.toString();
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index c8e6481..8a9e3e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -419,9 +419,8 @@ public class TimelineClientImpl extends TimelineClient {
         URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
         putObjects(uri, path, params, obj);
         needRetry = false;
-      } catch (Exception e) {
-        // TODO only handle exception for timelineServiceAddress being updated.
-        // skip retry for other exceptions.
+      } catch (IOException e) {
+        // handle exception for timelineServiceAddress being updated.
         checkRetryWithSleep(retries, e);
         retries--;
       }
@@ -448,29 +447,27 @@ public class TimelineClientImpl extends TimelineClient {
    * @param retries
    * @param e
    */
-  private void checkRetryWithSleep(int retries, Exception e) throws
-      YarnException, IOException {
+  private void checkRetryWithSleep(int retries, IOException e)
+      throws YarnException, IOException {
     if (retries > 0) {
       try {
         Thread.sleep(this.serviceRetryInterval);
       } catch (InterruptedException ex) {
         Thread.currentThread().interrupt();
+        throw new YarnException("Interrupted while retrying to connect to ATS");
       }
     } else {
-      LOG.error("TimelineClient has reached to max retry times :" +
-          this.maxServiceRetries + " for service address: " +
-          timelineServiceAddress);
-      if (e instanceof YarnException) {
-        throw (YarnException)e;
-      } else if (e instanceof IOException) {
-        throw (IOException)e;
-      } else {
-        throw new YarnException(e);
-      }
+      StringBuilder msg =
+          new StringBuilder("TimelineClient has reached to max retry times : ");
+      msg.append(this.maxServiceRetries);
+      msg.append(" for service address: ");
+      msg.append(timelineServiceAddress);
+      LOG.error(msg.toString());
+      throw new IOException(msg.toString(), e);
     }
   }
 
-  private void putObjects(
+  protected void putObjects(
       URI base, String path, MultivaluedMap<String, String> params, Object obj)
           throws IOException, YarnException {
     ClientResponse resp;
@@ -638,17 +635,19 @@ public class TimelineClientImpl extends TimelineClient {
 
   /**
    * Poll TimelineServiceAddress for maximum of retries times if it is null.
+   *
    * @param retries
    * @return the left retry times
+   * @throws IOException
    */
-  private int pollTimelineServiceAddress(int retries) {
+  private int pollTimelineServiceAddress(int retries) throws YarnException {
     while (timelineServiceAddress == null && retries > 0) {
       try {
         Thread.sleep(this.serviceRetryInterval);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
+        throw new YarnException("Interrupted while trying to connect ATS");
       }
-      // timelineServiceAddress = getTimelineServiceAddress();
       retries--;
     }
     return retries;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
index 7803f94..71dafdc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -34,23 +35,33 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 
 public class TestTimelineClientV2Impl {
   private static final Log LOG =
       LogFactory.getLog(TestTimelineClientV2Impl.class);
   private TestV2TimelineClient client;
   private static long TIME_TO_SLEEP = 150;
+  private static final String EXCEPTION_MSG = "Exception in the content";
 
   @Before
   public void setup() {
-    YarnConfiguration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
     conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
     conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
-    client = createTimelineClient(conf);
+    if (!currTestName.getMethodName()
+        .contains("testRetryOnConnectionFailure")) {
+      client = createTimelineClient(conf);
+    }
   }
 
+  @Rule
+  public TestName currTestName = new TestName();
+  private YarnConfiguration conf;
+
   private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) {
     ApplicationId id = ApplicationId.newInstance(0, 0);
     TestV2TimelineClient client = new TestV2TimelineClient(id);
@@ -59,9 +70,34 @@ public class TestTimelineClientV2Impl {
     return client;
   }
 
-  private class TestV2TimelineClient extends TimelineClientImpl {
+  private class TestV2TimelineClientForExceptionHandling
+      extends TimelineClientImpl {
+    public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
+      super(id);
+    }
+
+    protected boolean throwYarnException;
+
+    public void setThrowYarnException(boolean throwYarnException) {
+      this.throwYarnException = throwYarnException;
+    }
+
+    @Override
+    protected void putObjects(URI base, String path,
+        MultivaluedMap<String, String> params, Object obj)
+            throws IOException, YarnException {
+      if (throwYarnException) {
+        throw new YarnException(EXCEPTION_MSG);
+      } else {
+        throw new IOException(
+            "Failed to get the response from the timeline server.");
+      }
+    }
+  }
+
+  private class TestV2TimelineClient
+      extends TestV2TimelineClientForExceptionHandling {
     private boolean sleepBeforeReturn;
-    private boolean throwException;
 
     private List<TimelineEntities> publishedEntities;
 
@@ -75,10 +111,6 @@ public class TestTimelineClientV2Impl {
       this.sleepBeforeReturn = sleepBeforeReturn;
     }
 
-    public void setThrowException(boolean throwException) {
-      this.throwException = throwException;
-    }
-
     public int getNumOfTimelineEntitiesPublished() {
       return publishedEntities.size();
     }
@@ -91,7 +123,7 @@ public class TestTimelineClientV2Impl {
     protected void putObjects(String path,
         MultivaluedMap<String, String> params, Object obj)
             throws IOException, YarnException {
-      if (throwException) {
+      if (throwYarnException) {
         throw new YarnException("ActualException");
       }
       publishedEntities.add((TimelineEntities) obj);
@@ -106,6 +138,45 @@ public class TestTimelineClientV2Impl {
   }
 
   @Test
+  public void testExceptionMultipleRetry() {
+    TestV2TimelineClientForExceptionHandling client =
+        new TestV2TimelineClientForExceptionHandling(
+            ApplicationId.newInstance(0, 0));
+    int maxRetries = 2;
+    conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+        maxRetries);
+    client.init(conf);
+    client.start();
+    client.setTimelineServiceAddress("localhost:12345");
+    try {
+      client.putEntities(new TimelineEntity());
+    } catch (IOException e) {
+      Assert.fail("YARN exception is expected");
+    } catch (YarnException e) {
+      Throwable cause = e.getCause();
+      Assert.assertTrue("IOException is expected",
+          cause instanceof IOException);
+      Assert.assertTrue("YARN exception is expected",
+          cause.getMessage().contains(
+              "TimelineClient has reached to max retry times : " + maxRetries));
+    }
+
+    client.setThrowYarnException(true);
+    try {
+      client.putEntities(new TimelineEntity());
+    } catch (IOException e) {
+      Assert.fail("YARN exception is expected");
+    } catch (YarnException e) {
+      Throwable cause = e.getCause();
+      Assert.assertTrue("YARN exception is expected",
+          cause instanceof YarnException);
+      Assert.assertTrue("YARN exception is expected",
+          cause.getMessage().contains(EXCEPTION_MSG));
+    }
+    client.stop();
+  }
+
+  @Test
   public void testPostEntities() throws Exception {
     try {
       client.putEntities(generateEntity("1"));
@@ -189,7 +260,7 @@ public class TestTimelineClientV2Impl {
 
   @Test
   public void testExceptionCalls() throws Exception {
-    client.setThrowException(true);
+    client.setThrowYarnException(true);
     try {
       client.putEntitiesAsync(generateEntity("1"));
     } catch (YarnException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
index 7b42994..eadb5b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
@@ -69,4 +69,12 @@ public class ContainerMetricsConstants {
 
   public static final String ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO =
       "YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS";
+
+  // Event of this type will be emitted by NM.
+  public static final String LOCALIZATION_START_EVENT_TYPE =
+      "YARN_NM_CONTAINER_LOCALIZATION_STARTED";
+
+  // Event of this type will be emitted by NM.
+  public static final String LOCALIZATION_FINISHED_EVENT_TYPE =
+      "YARN_NM_CONTAINER_LOCALIZATION_FINISHED";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 20ca7f1..e07c2ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -84,6 +83,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
@@ -897,9 +897,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                 LOG.debug("Sync a new collector address: " + collectorAddr +
                     " for application: " + appId + " from RM.");
               }
-              TimelineClient client = application.getTimelineClient();
-              if (client != null) {
-                client.setTimelineServiceAddress(collectorAddr);
+              NMTimelinePublisher nmTimelinePublisher =
+                  context.getNMTimelinePublisher();
+              if (nmTimelinePublisher != null) {
+                nmTimelinePublisher.setTimelineServiceAddress(
+                    application.getAppId(), collectorAddr);
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
index 548c861..d667c0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -42,6 +41,7 @@ import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 
 /**
  * Service that handles collector information. It is used only if the timeline
@@ -116,10 +116,10 @@ public class NMCollectorService extends CompositeService implements
         String collectorAddr = collector.getCollectorAddr();
         newCollectorsMap.put(appId, collectorAddr);
         // set registered collector address to TimelineClient.
-        TimelineClient client =
-            context.getApplications().get(appId).getTimelineClient();
-        if (client != null) {
-          client.setTimelineServiceAddress(collectorAddr);
+        NMTimelinePublisher nmTimelinePublisher =
+            context.getNMTimelinePublisher();
+        if (nmTimelinePublisher != null) {
+          nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr);
         }
       }
       ((NodeManager.NMContext)context).addRegisteredCollectors(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
index 5de3398..aee0862 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
@@ -22,7 +22,6 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 
@@ -41,7 +40,4 @@ public interface Application extends EventHandler<ApplicationEvent> {
   String getFlowVersion();
 
   long getFlowRunId();
-  
-  TimelineClient getTimelineClient();
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 93c6758..b9786e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -76,7 +76,6 @@ public class ApplicationImpl implements Application {
   private final ReadLock readLock;
   private final WriteLock writeLock;
   private final Context context;
-  private TimelineClient timelineClient;
 
   private static final Log LOG = LogFactory.getLog(ApplicationImpl.class);
 
@@ -110,7 +109,7 @@ public class ApplicationImpl implements Application {
       }
       this.flowContext = flowContext;
       if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
-        createAndStartTimelineClient(conf);
+        context.getNMTimelinePublisher().createTimelineClient(appId);
       }
     }
   }
@@ -142,13 +141,6 @@ public class ApplicationImpl implements Application {
     }
   }
 
-  private void createAndStartTimelineClient(Configuration conf) {
-    // create and start timeline client
-    this.timelineClient = TimelineClient.createTimelineClient(appId);
-    timelineClient.init(conf);
-    timelineClient.start();
-  }
-
   @Override
   public String getUser() {
     return user.toString();
@@ -160,11 +152,6 @@ public class ApplicationImpl implements Application {
   }
   
   @Override
-  public TimelineClient getTimelineClient() {
-    return timelineClient;
-  }
-
-  @Override
   public ApplicationState getApplicationState() {
     this.readLock.lock();
     try {
@@ -494,9 +481,10 @@ public class ApplicationImpl implements Application {
         registeredCollectors.remove(app.getAppId());
       }
       // stop timelineClient when application get finished.
-      TimelineClient timelineClient = app.getTimelineClient();
-      if (timelineClient != null) {
-        timelineClient.stop();
+      NMTimelinePublisher nmTimelinePublisher =
+          app.context.getNMTimelinePublisher();
+      if (nmTimelinePublisher != null) {
+        nmTimelinePublisher.stopTimelineClient(app.getAppId());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 70b7e8d..4d3dafd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -41,16 +42,15 @@ import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -72,9 +72,12 @@ public class NMTimelinePublisher extends CompositeService {
 
   private String httpAddress;
 
+  protected final Map<ApplicationId, TimelineClient> appToClientMap;
+
   public NMTimelinePublisher(Context context) {
     super(NMTimelinePublisher.class.getName());
     this.context = context;
+    appToClientMap = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -82,12 +85,6 @@ public class NMTimelinePublisher extends CompositeService {
     dispatcher = new AsyncDispatcher();
     dispatcher.register(NMTimelineEventType.class,
         new ForwardingEventHandler());
-    dispatcher
-        .register(ContainerEventType.class, new ContainerEventHandler());
-    dispatcher.register(ApplicationEventType.class,
-        new ApplicationEventHandler());
-    dispatcher.register(LocalizationEventType.class,
-        new LocalizationEventDispatcher());
     addIfService(dispatcher);
     super.serviceInit(conf);
   }
@@ -112,7 +109,6 @@ public class NMTimelinePublisher extends CompositeService {
     }
   }
 
-  @SuppressWarnings("unchecked")
   public void reportContainerResourceUsage(Container container, Long pmemUsage,
       Float cpuUsagePercentPerCore) {
     if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
@@ -133,15 +129,32 @@ public class NMTimelinePublisher extends CompositeService {
             Math.round(cpuUsagePercentPerCore));
         entity.addMetric(cpuMetric);
       }
-      dispatcher.getEventHandler()
-          .handle(new TimelinePublishEvent(entity, container.getContainerId()
-              .getApplicationAttemptId().getApplicationId()));
+      ApplicationId appId = container.getContainerId().getApplicationAttemptId()
+          .getApplicationId();
+      try {
+        // no need to put it as part of publisher as timeline client already has
+        // Queuing concept
+        TimelineClient timelineClient = getTimelineClient(appId);
+        if (timelineClient != null) {
+          timelineClient.putEntitiesAsync(entity);
+        } else {
+          LOG.error("Seems like client has been removed before the container"
+              + " metric could be published for " + container.getContainerId());
+        }
+      } catch (IOException | YarnException e) {
+        LOG.error("Failed to publish Container metrics for container "
+            + container.getContainerId(), e);
+      }
     }
   }
 
-  private void publishContainerCreatedEvent(ContainerEntity entity,
-      ContainerId containerId, Resource resource, Priority priority,
-      long timestamp) {
+  @SuppressWarnings("unchecked")
+  private void publishContainerCreatedEvent(ContainerEvent event) {
+    ContainerId containerId = event.getContainerID();
+    ContainerEntity entity = createContainerEntity(containerId);
+    Container container = context.getContainers().get(containerId);
+    Resource resource = container.getResource();
+
     Map<String, Object> entityInfo = new HashMap<String, Object>();
     entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
         resource.getMemory());
@@ -152,7 +165,7 @@ public class NMTimelinePublisher extends CompositeService {
     entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
         nodeId.getPort());
     entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
-        priority.toString());
+        container.getPriority().toString());
     entityInfo.put(
         ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
         httpAddress);
@@ -160,13 +173,15 @@ public class NMTimelinePublisher extends CompositeService {
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(timestamp);
+    tEvent.setTimestamp(event.getTimestamp());
 
     entity.addEvent(tEvent);
-    entity.setCreatedTime(timestamp);
-    putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
+    entity.setCreatedTime(event.getTimestamp());
+    dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+        containerId.getApplicationAttemptId().getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   private void publishContainerFinishedEvent(ContainerStatus containerStatus,
       long timeStamp) {
     ContainerId containerId = containerStatus.getContainerId();
@@ -186,7 +201,38 @@ public class NMTimelinePublisher extends CompositeService {
     tEvent.setInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
+
+    dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+        containerId.getApplicationAttemptId().getApplicationId()));
+  }
+
+  private void publishContainerLocalizationEvent(
+      ContainerLocalizationEvent event, String eventType) {
+    Container container = event.getContainer();
+    ContainerId containerId = container.getContainerId();
+    TimelineEntity entity = createContainerEntity(containerId);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(eventType);
+    tEvent.setTimestamp(event.getTimestamp());
+    entity.addEvent(tEvent);
+
+    ApplicationId appId =
+        container.getContainerId().getApplicationAttemptId().getApplicationId();
+    try {
+      // no need to put it as part of publisher as timeline client already has
+      // Queuing concept
+      TimelineClient timelineClient = getTimelineClient(appId);
+      if (timelineClient != null) {
+        timelineClient.putEntitiesAsync(entity);
+      } else {
+        LOG.error("Seems like client has been removed before the event could be"
+            + " published for " + container.getContainerId());
+      }
+    } catch (IOException | YarnException e) {
+      LOG.error("Failed to publish Container metrics for container "
+          + container.getContainerId(), e);
+    }
   }
 
   private static ContainerEntity createContainerEntity(
@@ -207,23 +253,33 @@ public class NMTimelinePublisher extends CompositeService {
         LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
             + TimelineUtils.dumpTimelineRecordtoJSON(entity));
       }
-      TimelineClient timelineClient =
-          context.getApplications().get(appId).getTimelineClient();
-      timelineClient.putEntities(entity);
+      TimelineClient timelineClient = getTimelineClient(appId);
+      if (timelineClient != null) {
+        timelineClient.putEntities(entity);
+      } else {
+        LOG.error("Seems like client has been removed before the entity "
+            + "could be published for " + entity);
+      }
     } catch (Exception e) {
       LOG.error("Error when publishing entity " + entity, e);
     }
   }
 
-  @SuppressWarnings("unchecked")
   public void publishApplicationEvent(ApplicationEvent event) {
     // publish only when the desired event is received
     switch (event.getType()) {
     case INIT_APPLICATION:
     case FINISH_APPLICATION:
-    case APPLICATION_CONTAINER_FINISHED:
     case APPLICATION_LOG_HANDLING_FAILED:
-      dispatcher.getEventHandler().handle(event);
+      // TODO need to be handled in future,
+      // not sure to publish under which entity
+      break;
+    case APPLICATION_CONTAINER_FINISHED:
+      // this is actually used to publish the container Event
+      ApplicationContainerFinishedEvent evnt =
+          (ApplicationContainerFinishedEvent) event;
+      publishContainerFinishedEvent(evnt.getContainerStatus(),
+          event.getTimestamp());
       break;
 
     default:
@@ -235,12 +291,11 @@ public class NMTimelinePublisher extends CompositeService {
     }
   }
 
-  @SuppressWarnings("unchecked")
   public void publishContainerEvent(ContainerEvent event) {
     // publish only when the desired event is received
     switch (event.getType()) {
     case INIT_CONTAINER:
-      dispatcher.getEventHandler().handle(event);
+      publishContainerCreatedEvent(event);
       break;
 
     default:
@@ -253,15 +308,17 @@ public class NMTimelinePublisher extends CompositeService {
     }
   }
 
-  @SuppressWarnings("unchecked")
   public void publishLocalizationEvent(LocalizationEvent event) {
     // publish only when the desired event is received
     switch (event.getType()) {
     case CONTAINER_RESOURCES_LOCALIZED:
+      publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
+          ContainerMetricsConstants.LOCALIZATION_FINISHED_EVENT_TYPE);
+      break;
     case INIT_CONTAINER_RESOURCES:
-      dispatcher.getEventHandler().handle(event);
+      publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
+          ContainerMetricsConstants.LOCALIZATION_START_EVENT_TYPE);
       break;
-
     default:
       if (LOG.isDebugEnabled()) {
         LOG.debug(event.getType()
@@ -272,64 +329,6 @@ public class NMTimelinePublisher extends CompositeService {
     }
   }
 
-  private class ApplicationEventHandler implements
-      EventHandler<ApplicationEvent> {
-    @Override
-    public void handle(ApplicationEvent event) {
-      switch (event.getType()) {
-      case APPLICATION_CONTAINER_FINISHED:
-        // this is actually used to publish the container Event
-        ApplicationContainerFinishedEvent evnt =
-            (ApplicationContainerFinishedEvent) event;
-        publishContainerFinishedEvent(evnt.getContainerStatus(),
-            event.getTimestamp());
-        break;
-      default:
-        LOG.error("Seems like event type is captured only in "
-            + "publishApplicationEvent method and not handled here");
-        break;
-      }
-    }
-  }
-
-  private class ContainerEventHandler implements EventHandler<ContainerEvent> {
-    @Override
-    public void handle(ContainerEvent event) {
-      ContainerId containerId = event.getContainerID();
-      Container container = context.getContainers().get(containerId);
-      long timestamp = event.getTimestamp();
-      ContainerEntity entity = createContainerEntity(containerId);
-
-      switch (event.getType()) {
-      case INIT_CONTAINER:
-        publishContainerCreatedEvent(entity, containerId,
-            container.getResource(), container.getPriority(), timestamp);
-        break;
-      default:
-        LOG.error("Seems like event type is captured only in "
-            + "publishContainerEvent method and not handled here");
-        break;
-      }
-    }
-  }
-
-  private static final class LocalizationEventDispatcher implements
-      EventHandler<LocalizationEvent> {
-    @Override
-    public void handle(LocalizationEvent event) {
-      switch (event.getType()) {
-      case INIT_CONTAINER_RESOURCES:
-      case CONTAINER_RESOURCES_LOCALIZED:
-        // TODO after priority based flush jira is finished
-        break;
-      default:
-        LOG.error("Seems like event type is captured only in "
-            + "publishLocalizationEvent method and not handled here");
-        break;
-      }
-    }
-  }
-
   /**
    * EventHandler implementation which forward events to NMMetricsPublisher.
    * Making use of it, NMMetricsPublisher can avoid to have a public handle
@@ -363,4 +362,33 @@ public class NMTimelinePublisher extends CompositeService {
       return entityToPublish;
     }
   }
+
+  public void createTimelineClient(ApplicationId appId) {
+    if (!appToClientMap.containsKey(appId)) {
+      TimelineClient timelineClient =
+          TimelineClient.createTimelineClient(appId);
+      timelineClient.init(getConfig());
+      timelineClient.start();
+      appToClientMap.put(appId, timelineClient);
+    }
+  }
+
+  public void stopTimelineClient(ApplicationId appId) {
+    TimelineClient client = appToClientMap.remove(appId);
+    if (client != null) {
+      client.stop();
+    }
+  }
+
+  public void setTimelineServiceAddress(ApplicationId appId,
+      String collectorAddr) {
+    TimelineClient client = appToClientMap.get(appId);
+    if (client != null) {
+      client.setTimelineServiceAddress(collectorAddr);
+    }
+  }
+
+  private TimelineClient getTimelineClient(ApplicationId appId) {
+    return appToClientMap.get(appId);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
index 830ed6b..4aa28d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -20,14 +20,12 @@ package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -39,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.junit.Assert;
@@ -53,20 +50,23 @@ public class TestNMTimelinePublisher {
   public void testContainerResourceUsage() {
     Context context = mock(Context.class);
     @SuppressWarnings("unchecked")
-    ConcurrentMap<ApplicationId, Application> map = mock(ConcurrentMap.class);
-    Application aApp = mock(Application.class);
-    when(map.get(any(ApplicationId.class))).thenReturn(aApp);
-    DummyTimelineClient timelineClient = new DummyTimelineClient();
-    when(aApp.getTimelineClient()).thenReturn(timelineClient);
-    when(context.getApplications()).thenReturn(map);
+    final DummyTimelineClient timelineClient = new DummyTimelineClient();
     when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
     when(context.getHttpPort()).thenReturn(0);
-    NMTimelinePublisher publisher = new NMTimelinePublisher(context);
+    NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
+      public void createTimelineClient(ApplicationId appId) {
+        if (!appToClientMap.containsKey(appId)) {
+          appToClientMap.put(appId, timelineClient);
+        }
+      }
+    };
     publisher.init(new Configuration());
     publisher.start();
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    publisher.createTimelineClient(appId);
     Container aContainer = mock(Container.class);
     when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId(
-        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1),
+        ApplicationAttemptId.newInstance(appId, 1),
         0L));
     publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
     verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
@@ -141,7 +141,7 @@ public class TestNMTimelinePublisher {
     private TimelineEntity[] lastPublishedEntities;
 
     @Override
-    public void putEntities(TimelineEntity... entities)
+    public void putEntitiesAsync(TimelineEntity... entities)
         throws IOException, YarnException {
       this.lastPublishedEntities = entities;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f746c80b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
index 4d1be84..c983040 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
@@ -101,9 +101,4 @@ public class MockApp implements Application {
   public long getFlowRunId() {
     return flowRunId;
   }
-  
-  @Override
-  public TimelineClient getTimelineClient() {
-    return timelineClient;
-  }
 }