You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/02/09 18:07:57 UTC

hadoop git commit: YARN-3367. Replace starting a separate thread for post entity with event loop in TimelineClient (Naganarasimha G R via sjlee)

Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 db76a3ad0 -> d491ef080


YARN-3367. Replace starting a separate thread for post entity with event loop in TimelineClient (Naganarasimha G R via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d491ef08
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d491ef08
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d491ef08

Branch: refs/heads/YARN-2928
Commit: d491ef080096c62964b8327555bf47ceae6e9292
Parents: db76a3a
Author: Sangjin Lee <sj...@apache.org>
Authored: Tue Feb 9 09:07:37 2016 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Tue Feb 9 09:07:37 2016 -0800

----------------------------------------------------------------------
 .../jobhistory/JobHistoryEventHandler.java      |  61 +---
 .../mapred/JobHistoryFileReplayMapper.java      |   8 +-
 .../hadoop/mapred/TimelineEntityConverter.java  |  12 +-
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../timelineservice/TimelineEntities.java       |  17 +-
 .../hadoop/yarn/conf/YarnConfiguration.java     |   6 +
 .../distributedshell/ApplicationMaster.java     |  78 +----
 .../api/async/impl/AMRMClientAsyncImpl.java     |  26 +-
 .../hadoop/yarn/client/api/TimelineClient.java  |   8 +-
 .../client/api/impl/TimelineClientImpl.java     | 286 ++++++++++++++---
 .../src/main/resources/yarn-default.xml         |   7 +
 .../api/impl/TestTimelineClientV2Impl.java      | 304 +++++++++++++++++++
 .../nodemanager/NodeStatusUpdaterImpl.java      |   4 +-
 13 files changed, 623 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 6e5afb1..1c5446f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -27,10 +27,7 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -75,7 +72,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * The job history events get routed to this class. This class writes the Job
  * history events to the DFS directly into a staging dir and then moved to a
@@ -129,10 +125,6 @@ public class JobHistoryEventHandler extends AbstractService
   
   private boolean timelineServiceV2Enabled = false;
 
-  // For posting entities in new timeline service in a non-blocking way
-  // TODO YARN-3367 replace with event loop in TimelineClient.
-  private ExecutorService threadPool;
-
   private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
   private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
   private static final String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE =
@@ -272,10 +264,6 @@ public class JobHistoryEventHandler extends AbstractService
             YarnConfiguration.timelineServiceV2Enabled(conf);
         LOG.info("Timeline service is enabled; version: " +
             YarnConfiguration.getTimelineServiceVersion(conf));
-        if (timelineServiceV2Enabled) {
-          // initialize the thread pool for v.2 timeline service
-          threadPool = createThreadPool();
-        }
       } else {
         LOG.info("Timeline service is not enabled");
       }
@@ -449,35 +437,9 @@ public class JobHistoryEventHandler extends AbstractService
     if (timelineClient != null) {
       timelineClient.stop();
     }
-    if (threadPool != null) {
-      shutdownAndAwaitTermination();
-    }
     LOG.info("Stopped JobHistoryEventHandler. super.stop()");
     super.serviceStop();
   }
-  
-  // TODO remove threadPool after adding non-blocking call in TimelineClient
-  private ExecutorService createThreadPool() {
-    return Executors.newCachedThreadPool(
-      new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
-      .build());
-  }
-
-  private void shutdownAndAwaitTermination() {
-    threadPool.shutdown();
-    try {
-      if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
-        threadPool.shutdownNow(); 
-        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
-          LOG.error("ThreadPool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      threadPool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
 
   protected EventWriter createEventWriter(Path historyFilePath)
       throws IOException {
@@ -1072,21 +1034,6 @@ public class JobHistoryEventHandler extends AbstractService
     }
   }
   
-  private void putEntityWithoutBlocking(final TimelineClient client,
-      final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) {
-    Runnable publishWrapper = new Runnable() {
-      public void run() {
-        try {
-          client.putEntities(entity);
-        } catch (IOException|YarnException e) {
-          LOG.error("putEntityNonBlocking get failed: " + e);
-          throw new RuntimeException(e.toString());
-        }
-      }
-    };
-    threadPool.execute(publishWrapper);
-  }
-  
   // create JobEntity from HistoryEvent with adding other info, like: 
   // jobId, timestamp and entityType.
   private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity 
@@ -1247,7 +1194,13 @@ public class JobHistoryEventHandler extends AbstractService
             taskId, setCreatedTime);
       }
     }
-    putEntityWithoutBlocking(timelineClient, tEntity);
+    try {
+      timelineClient.putEntitiesAsync(tEntity);
+    } catch (IOException | YarnException e) {
+      LOG.error("Failed to process Event " + event.getEventType()
+          + " for the job : " + jobId, e);
+    }
+
   }
 
   private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
index 802b78f..4fb5308 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -176,7 +176,7 @@ class JobHistoryFileReplayMapper extends EntityWriter {
 
         // create entities from job history and write them
         long totalTime = 0;
-        Set<TimelineEntity> entitySet =
+        List<TimelineEntity> entitySet =
             converter.createTimelineEntities(jobInfo, jobConf);
         LOG.info("converted them into timeline entities for job " + jobIdStr);
         // use the current user for this purpose
@@ -215,7 +215,7 @@ class JobHistoryFileReplayMapper extends EntityWriter {
   }
 
   private void writeAllEntities(AppLevelTimelineCollector collector,
-      Set<TimelineEntity> entitySet, UserGroupInformation ugi)
+      List<TimelineEntity> entitySet, UserGroupInformation ugi)
       throws IOException {
     TimelineEntities entities = new TimelineEntities();
     entities.setEntities(entitySet);
@@ -223,7 +223,7 @@ class JobHistoryFileReplayMapper extends EntityWriter {
   }
 
   private void writePerEntity(AppLevelTimelineCollector collector,
-      Set<TimelineEntity> entitySet, UserGroupInformation ugi)
+      List<TimelineEntity> entitySet, UserGroupInformation ugi)
       throws IOException {
     for (TimelineEntity entity : entitySet) {
       TimelineEntities entities = new TimelineEntities();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
index 880014b..0e2eb72 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -57,16 +59,16 @@ class TimelineEntityConverter {
    * Note that we also do not add info to the YARN application entity, which
    * would be needed for aggregation.
    */
-  public Set<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
+  public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
       Configuration conf) {
-    Set<TimelineEntity> entities = new HashSet<>();
+    List<TimelineEntity> entities = new ArrayList<>();
 
     // create the job entity
     TimelineEntity job = createJobEntity(jobInfo, conf);
     entities.add(job);
 
     // create the task and task attempt entities
-    Set<TimelineEntity> tasksAndAttempts =
+    List<TimelineEntity> tasksAndAttempts =
         createTaskAndTaskAttemptEntities(jobInfo);
     entities.addAll(tasksAndAttempts);
 
@@ -125,9 +127,9 @@ class TimelineEntityConverter {
     }
   }
 
-  private Set<TimelineEntity> createTaskAndTaskAttemptEntities(
+  private List<TimelineEntity> createTaskAndTaskAttemptEntities(
       JobInfo jobInfo) {
-    Set<TimelineEntity> entities = new HashSet<>();
+    List<TimelineEntity> entities = new ArrayList<>();
     Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
     LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
         " tasks");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f6bf667..4c77b67 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -167,6 +167,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4446. Refactor reader API for better extensibility (Varun Saxena via
     sjlee)
 
+    YARN-3367. Replace starting a separate thread for post entity with event
+    loop in TimelineClient (Naganarasimha G R via sjlee)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
index f08a0ec..63989e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
@@ -17,15 +17,16 @@
  */
 package org.apache.hadoop.yarn.api.records.timelineservice;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
-import java.util.HashSet;
-import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * This class hosts a set of timeline entities.
@@ -36,22 +37,22 @@ import java.util.Set;
 @InterfaceStability.Unstable
 public class TimelineEntities {
 
-  private Set<TimelineEntity> entities = new HashSet<>();
+  private List<TimelineEntity> entities = new ArrayList<>();
 
   public TimelineEntities() {
 
   }
 
   @XmlElement(name = "entities")
-  public Set<TimelineEntity> getEntities() {
+  public List<TimelineEntity> getEntities() {
     return entities;
   }
 
-  public void setEntities(Set<TimelineEntity> timelineEntities) {
+  public void setEntities(List<TimelineEntity> timelineEntities) {
     this.entities = timelineEntities;
   }
 
-  public void addEntities(Set<TimelineEntity> timelineEntities) {
+  public void addEntities(List<TimelineEntity> timelineEntities) {
     this.entities.addAll(timelineEntities);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9b43fbd..6ac6fb9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1762,6 +1762,12 @@ public class YarnConfiguration extends Configuration {
 
   public static final int DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = 1000;
 
+  public static final String NUMBER_OF_ASYNC_ENTITIES_TO_MERGE =
+      TIMELINE_SERVICE_PREFIX
+          + "timeline-client.number-of-async-entities-to-merge";
+
+  public static final int DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = 10;
+
   // mark app-history related configs @Private as application history is going
   // to be integrated into the timeline service
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 1c68086..cb5f53b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -39,9 +39,6 @@ import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.cli.CommandLine;
@@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.LogManager;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * An ApplicationMaster for executing shell commands on a set of launched
@@ -219,10 +215,6 @@ public class ApplicationMaster {
 
   private boolean timelineServiceV2 = false;
 
-  // For posting entities in new timeline service in a non-blocking way
-  // TODO replace with event loop in TimelineClient.
-  private ExecutorService threadPool;
-
   // App Master configuration
   // No. of containers to run shell command on
   @VisibleForTesting
@@ -311,10 +303,6 @@ public class ApplicationMaster {
       }
       appMaster.run();
       result = appMaster.finish();
-
-      if (appMaster.threadPool != null) {
-        appMaster.shutdownAndAwaitTermination();
-      }
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
       LogManager.shutdown();
@@ -329,29 +317,6 @@ public class ApplicationMaster {
     }
   }
 
-  //TODO remove threadPool after adding non-blocking call in TimelineClient
-  private ExecutorService createThreadPool() {
-    return Executors.newCachedThreadPool(
-        new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
-        .build());
-  }
-
-  private void shutdownAndAwaitTermination() {
-    threadPool.shutdown();
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
-        threadPool.shutdownNow();
-        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
-          LOG.error("ThreadPool did not terminate");
-      }
-    } catch (InterruptedException ie) {
-      threadPool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
-
   /**
    * Dump out contents of $CWD and the environment to stdout for debugging
    */
@@ -547,11 +512,7 @@ public class ApplicationMaster {
         .getOptionValue("priority", "0"));
 
     if (YarnConfiguration.timelineServiceEnabled(conf)) {
-      timelineServiceV2 =
-          YarnConfiguration.timelineServiceV2Enabled(conf);
-      if (timelineServiceV2) {
-        threadPool = createThreadPool();
-      }
+      timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
     } else {
       timelineClient = null;
       LOG.warn("Timeline service is not enabled");
@@ -701,8 +662,10 @@ public class ApplicationMaster {
             if (timelineServiceV2) {
               timelineClient = TimelineClient.createTimelineClient(
                   appAttemptID.getApplicationId());
+              LOG.info("Timeline service V2 client is enabled");
             } else {
               timelineClient = TimelineClient.createTimelineClient();
+              LOG.info("Timeline service V1 client is enabled");
             }
             timelineClient.init(conf);
             timelineClient.start();
@@ -1304,18 +1267,8 @@ public class ApplicationMaster {
             shellId);
     return new Thread(runnableLaunchContainer);
   }
-  
-  private void publishContainerStartEventOnTimelineServiceV2(
-      final Container container) {
-    Runnable publishWrapper = new Runnable() {
-      public void run() {
-        publishContainerStartEventOnTimelineServiceV2Base(container);
-      }
-    };
-    threadPool.execute(publishWrapper);
-  }
 
-  private void publishContainerStartEventOnTimelineServiceV2Base(
+  private void publishContainerStartEventOnTimelineServiceV2(
       Container container) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
@@ -1349,16 +1302,6 @@ public class ApplicationMaster {
 
   private void publishContainerEndEventOnTimelineServiceV2(
       final ContainerStatus container) {
-    Runnable publishWrapper = new Runnable() {
-      public void run() {
-          publishContainerEndEventOnTimelineServiceV2Base(container);
-      }
-    };
-    threadPool.execute(publishWrapper);
-  }
-
-  private void publishContainerEndEventOnTimelineServiceV2Base(
-      final ContainerStatus container) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
     entity.setId(container.getContainerId().toString());
@@ -1389,17 +1332,6 @@ public class ApplicationMaster {
   }
 
   private void publishApplicationAttemptEventOnTimelineServiceV2(
-      final DSEvent appEvent) {
-
-    Runnable publishWrapper = new Runnable() {
-      public void run() {
-        publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent);
-      }
-    };
-    threadPool.execute(publishWrapper);
-  }
-
-  private void publishApplicationAttemptEventOnTimelineServiceV2Base(
       DSEvent appEvent) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
@@ -1417,7 +1349,7 @@ public class ApplicationMaster {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
-          timelineClient.putEntities(entity);
+          timelineClient.putEntitiesAsync(entity);
           return null;
         }
       });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 212f721..8af0c78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -41,9 +41,9 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -327,6 +327,19 @@ extends AMRMClientAsync<T> {
             LOG.info("Interrupted while waiting for queue", ex);
             continue;
           }
+
+          String collectorAddress = response.getCollectorAddr();
+          TimelineClient timelineClient = client.getRegisteredTimeineClient();
+          if (timelineClient != null && collectorAddress != null
+              && !collectorAddress.isEmpty()) {
+            if (collectorAddr == null
+                || !collectorAddr.equals(collectorAddress)) {
+              collectorAddr = collectorAddress;
+              timelineClient.setTimelineServiceAddress(collectorAddress);
+              LOG.info("collectorAddress " + collectorAddress);
+            }
+          }
+
           List<NodeReport> updatedNodes = response.getUpdatedNodes();
           if (!updatedNodes.isEmpty()) {
             handler.onNodesUpdated(updatedNodes);
@@ -354,17 +367,6 @@ extends AMRMClientAsync<T> {
           if (!allocated.isEmpty()) {
             handler.onContainersAllocated(allocated);
           }
-
-          String collectorAddress = response.getCollectorAddr();
-          TimelineClient timelineClient = client.getRegisteredTimeineClient();
-          if (timelineClient != null && collectorAddress != null
-              && !collectorAddress.isEmpty()) {
-            if (collectorAddr == null ||
-                !collectorAddr.equals(collectorAddress)) {
-              collectorAddr = collectorAddress;
-              timelineClient.setTimelineServiceAddress(collectorAddress);
-            }
-          }
           progress = handler.getProgress();
         } catch (Throwable ex) {
           handler.onError(ex);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index ade4f9a..24d9f32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -27,12 +27,12 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -53,7 +53,7 @@ public abstract class TimelineClient extends AbstractService {
    * construct and initialize a timeline client if the following operations are
    * supposed to be conducted by that user.
    */
-  private ApplicationId contextAppId;
+  protected ApplicationId contextAppId;
 
   /**
    * Creates an instance of the timeline v.1.x client.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index a158a56..c8e6481 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -29,6 +29,14 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
@@ -129,6 +137,8 @@ public class TimelineClientImpl extends TimelineClient {
   @VisibleForTesting
   TimelineClientConnectionRetry connectionRetry;
 
+  private TimelineEntityDispatcher entityDispatcher;
+
   // Abstract class for an operation that should be retried by timeline client
   private static abstract class TimelineClientRetryOp {
     // The operation that should be retried
@@ -312,6 +322,7 @@ public class TimelineClientImpl extends TimelineClient {
       serviceRetryInterval = conf.getLong(
           YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
           YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+      entityDispatcher = new TimelineEntityDispatcher(conf);
     } else {
       if (YarnConfiguration.useHttps(conf)) {
         setTimelineServiceAddress(conf.get(
@@ -332,7 +343,9 @@ public class TimelineClientImpl extends TimelineClient {
 
   @Override
   protected void serviceStart() throws Exception {
-    if (!timelineServiceV2) {
+    if (timelineServiceV2) {
+      entityDispatcher.start();
+    } else {
       timelineWriter = createTimelineWriter(configuration, authUgi, client,
           constructResURI(getConfig(), timelineServiceAddress, false));
     }
@@ -354,6 +367,9 @@ public class TimelineClientImpl extends TimelineClient {
     if (this.timelineWriter != null) {
       this.timelineWriter.close();
     }
+    if (timelineServiceV2) {
+      entityDispatcher.stop();
+    }
     super.serviceStop();
   }
 
@@ -366,37 +382,21 @@ public class TimelineClientImpl extends TimelineClient {
   @Override
   public void putEntities(
       org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
-      throws IOException, YarnException {
-    putEntities(false, entities);
+          throws IOException, YarnException {
+    if (!timelineServiceV2) {
+      throw new YarnException("v.2 method is invoked on a v.1.x client");
+    }
+    entityDispatcher.dispatchEntities(true, entities);
   }
 
   @Override
   public void putEntitiesAsync(
       org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
       throws IOException, YarnException {
-    putEntities(true, entities);
-  }
-
-  private void putEntities(boolean async,
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
-      throws IOException, YarnException {
     if (!timelineServiceV2) {
       throw new YarnException("v.2 method is invoked on a v.1.x client");
     }
-    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
-        entitiesContainer =
-        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
-    for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) {
-      entitiesContainer.addEntity(entity);
-    }
-    MultivaluedMap<String, String> params = new MultivaluedMapImpl();
-    if (getContextAppId() != null) {
-      params.add("appid", getContextAppId().toString());
-    }
-    if (async) {
-      params.add("async", Boolean.TRUE.toString());
-    }
-    putObjects("entities", params, entitiesContainer);
+    entityDispatcher.dispatchEntities(false, entities);
   }
 
   @Override
@@ -407,20 +407,10 @@ public class TimelineClientImpl extends TimelineClient {
 
   // Used for new timeline service only
   @Private
-  public void putObjects(String path, MultivaluedMap<String, String> params,
+  protected void putObjects(String path, MultivaluedMap<String, String> params,
       Object obj) throws IOException, YarnException {
 
-    // timelineServiceAddress could haven't be initialized yet
-    // or stale (only for new timeline service)
-    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
-    if (timelineServiceAddress == null) {
-      String errMessage = "TimelineClient has reached to max retry times : "
-          + this.maxServiceRetries
-          + ", but failed to fetch timeline service address. Please verify"
-          + " Timeline Auxillary Service is configured in all the NMs";
-      LOG.error(errMessage);
-      throw new YarnException(errMessage);
-    }
+    int retries = verifyRestEndPointAvailable();
 
     // timelineServiceAddress could be stale, add retry logic here.
     boolean needRetry = true;
@@ -438,6 +428,21 @@ public class TimelineClientImpl extends TimelineClient {
     }
   }
 
+  private int verifyRestEndPointAvailable() throws YarnException {
+    // timelineServiceAddress could haven't be initialized yet
+    // or stale (only for new timeline service)
+    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
+    if (timelineServiceAddress == null) {
+      String errMessage = "TimelineClient has reached to max retry times : "
+          + this.maxServiceRetries
+          + ", but failed to fetch timeline service address. Please verify"
+          + " Timeline Auxillary Service is configured in all the NMs";
+      LOG.error(errMessage);
+      throw new YarnException(errMessage);
+    }
+    return retries;
+  }
+
   /**
    * Check if reaching to maximum of retries.
    * @param retries
@@ -643,7 +648,7 @@ public class TimelineClientImpl extends TimelineClient {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      timelineServiceAddress = getTimelineServiceAddress();
+      // timelineServiceAddress = getTimelineServiceAddress();
       retries--;
     }
     return retries;
@@ -862,4 +867,213 @@ public class TimelineClientImpl extends TimelineClient {
   public void setTimelineWriter(TimelineWriter writer) {
     this.timelineWriter = writer;
   }
+
+  private final class EntitiesHolder extends FutureTask<Void> {
+    private final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities;
+    private final boolean isSync;
+
+    EntitiesHolder(
+        final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities,
+        final boolean isSync) {
+      super(new Callable<Void>() {
+        // publishEntities()
+        public Void call() throws Exception {
+          MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+          params.add("appid", contextAppId.toString());
+          params.add("async", Boolean.toString(!isSync));
+          putObjects("entities", params, entities);
+          return null;
+        }
+      });
+      this.entities = entities;
+      this.isSync = isSync;
+    }
+
+    public boolean isSync() {
+      return isSync;
+    }
+
+    public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities getEntities() {
+      return entities;
+    }
+  }
+
+  /**
+   * This class is responsible for collecting the timeline entities and
+   * publishing them in async.
+   */
+  private class TimelineEntityDispatcher {
+    /**
+     * Time period for which the timelineclient will wait for draining after
+     * stop
+     */
+    private static final long DRAIN_TIME_PERIOD = 2000L;
+
+    private int numberOfAsyncsToMerge;
+    private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
+    private ExecutorService executor;
+
+    TimelineEntityDispatcher(Configuration conf) {
+      timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
+      numberOfAsyncsToMerge =
+          conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
+              YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
+    }
+
+    Runnable createRunnable() {
+      return new Runnable() {
+        @Override
+        public void run() {
+          try {
+            EntitiesHolder entitiesHolder;
+            while (!Thread.currentThread().isInterrupted()) {
+              // Merge all the async calls and make one push, but if its sync
+              // call push immediately
+              try {
+                entitiesHolder = timelineEntityQueue.take();
+              } catch (InterruptedException ie) {
+                LOG.info("Timeline dispatcher thread was interrupted ");
+                Thread.currentThread().interrupt();
+                return;
+              }
+              if (entitiesHolder != null) {
+                publishWithoutBlockingOnQueue(entitiesHolder);
+              }
+            }
+          } finally {
+            if (!timelineEntityQueue.isEmpty()) {
+              LOG.info("Yet to publish " + timelineEntityQueue.size()
+                  + " timelineEntities, draining them now. ");
+            }
+            // Try to drain the remaining entities to be published @ the max for
+            // 2 seconds
+            long timeTillweDrain =
+                System.currentTimeMillis() + DRAIN_TIME_PERIOD;
+            while (!timelineEntityQueue.isEmpty()) {
+              publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
+              if (System.currentTimeMillis() > timeTillweDrain) {
+                // time elapsed stop publishing further....
+                if (!timelineEntityQueue.isEmpty()) {
+                  LOG.warn("Time to drain elapsed! Remaining "
+                      + timelineEntityQueue.size() + "timelineEntities will not"
+                      + " be published");
+                  // if some entities were not drained then we need interrupt
+                  // the threads which had put sync EntityHolders to the queue.
+                  EntitiesHolder nextEntityInTheQueue = null;
+                  while ((nextEntityInTheQueue =
+                      timelineEntityQueue.poll()) != null) {
+                    nextEntityInTheQueue.cancel(true);
+                  }
+                }
+                break;
+              }
+            }
+          }
+        }
+
+        /**
+         * Publishes the given EntitiesHolder and return immediately if sync
+         * call, else tries to fetch the EntitiesHolder from the queue in non
+         * blocking fashion and collate the Entities if possible before
+         * publishing through REST.
+         *
+         * @param entitiesHolder
+         */
+        private void publishWithoutBlockingOnQueue(
+            EntitiesHolder entitiesHolder) {
+          if (entitiesHolder.isSync()) {
+            entitiesHolder.run();
+            return;
+          }
+          int count = 1;
+          while (true) {
+            // loop till we find a sync put Entities or there is nothing
+            // to take
+            EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
+            if (nextEntityInTheQueue == null) {
+              // Nothing in the queue just publish and get back to the
+              // blocked wait state
+              entitiesHolder.run();
+              break;
+            } else if (nextEntityInTheQueue.isSync()) {
+              // flush all the prev async entities first
+              entitiesHolder.run();
+              // and then flush the sync entity
+              nextEntityInTheQueue.run();
+              break;
+            } else {
+              // append all async entities together and then flush
+              entitiesHolder.getEntities().addEntities(
+                  nextEntityInTheQueue.getEntities().getEntities());
+              count++;
+              if (count == numberOfAsyncsToMerge) {
+                // Flush the entities if the number of the async
+                // putEntites merged reaches the desired limit. To avoid
+                // collecting multiple entities and delaying for a long
+                // time.
+                entitiesHolder.run();
+                break;
+              }
+            }
+          }
+        }
+      };
+    }
+
+    public void dispatchEntities(boolean sync,
+        org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] entitiesTobePublished)
+            throws YarnException {
+      if (executor.isShutdown()) {
+        throw new YarnException("Timeline client is in the process of stopping,"
+            + " not accepting any more TimelineEntities");
+      }
+
+      // wrap all TimelineEntity into TimelineEntities object
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities =
+          new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
+      for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entitiesTobePublished) {
+        entities.addEntity(entity);
+      }
+
+      // created a holder and place it in queue
+      EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
+      try {
+        timelineEntityQueue.put(entitiesHolder);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new YarnException(
+            "Failed while adding entity to the queue for publishing", e);
+      }
+
+      if (sync) {
+        // In sync call we need to wait till its published and if any error then
+        // throw it back
+        try {
+          entitiesHolder.get();
+        } catch (ExecutionException e) {
+          throw new YarnException("Failed while publishing entity",
+              e.getCause());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new YarnException("Interrupted while publishing entity", e);
+        }
+      }
+    }
+
+    public void start() {
+      executor = Executors.newSingleThreadExecutor();
+      executor.execute(createRunnable());
+    }
+
+    public void stop() {
+      LOG.info("Stopping TimelineClient.");
+      executor.shutdownNow();
+      try {
+        executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        e.printStackTrace();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index b521599..2cbc836 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2081,6 +2081,13 @@
     <value>1000</value>
   </property>
 
+  <property>
+    <description>Time line V2 client tries to merge these many number of
+    async entities (if available) and then call the REST ATS V2 API to submit.
+    </description>
+    <name>yarn.timeline-service.timeline-client.number-of-async-entities-to-merge</name>
+    <value>10</value>
+  </property>
   <!--  Shared Cache Configuration -->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
new file mode 100644
index 0000000..7803f94
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTimelineClientV2Impl {
+  private static final Log LOG =
+      LogFactory.getLog(TestTimelineClientV2Impl.class);
+  private TestV2TimelineClient client;
+  private static long TIME_TO_SLEEP = 150;
+
+  @Before
+  public void setup() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+    conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
+    client = createTimelineClient(conf);
+  }
+
+  private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) {
+    ApplicationId id = ApplicationId.newInstance(0, 0);
+    TestV2TimelineClient client = new TestV2TimelineClient(id);
+    client.init(conf);
+    client.start();
+    return client;
+  }
+
+  private class TestV2TimelineClient extends TimelineClientImpl {
+    private boolean sleepBeforeReturn;
+    private boolean throwException;
+
+    private List<TimelineEntities> publishedEntities;
+
+    public TimelineEntities getPublishedEntities(int putIndex) {
+      Assert.assertTrue("Not So many entities Published",
+          putIndex < publishedEntities.size());
+      return publishedEntities.get(putIndex);
+    }
+
+    public void setSleepBeforeReturn(boolean sleepBeforeReturn) {
+      this.sleepBeforeReturn = sleepBeforeReturn;
+    }
+
+    public void setThrowException(boolean throwException) {
+      this.throwException = throwException;
+    }
+
+    public int getNumOfTimelineEntitiesPublished() {
+      return publishedEntities.size();
+    }
+
+    public TestV2TimelineClient(ApplicationId id) {
+      super(id);
+      publishedEntities = new ArrayList<TimelineEntities>();
+    }
+
+    protected void putObjects(String path,
+        MultivaluedMap<String, String> params, Object obj)
+            throws IOException, YarnException {
+      if (throwException) {
+        throw new YarnException("ActualException");
+      }
+      publishedEntities.add((TimelineEntities) obj);
+      if (sleepBeforeReturn) {
+        try {
+          Thread.sleep(TIME_TO_SLEEP);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testPostEntities() throws Exception {
+    try {
+      client.putEntities(generateEntity("1"));
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected");
+    }
+  }
+
+  @Test
+  public void testASyncCallMerge() throws Exception {
+    client.setSleepBeforeReturn(true);
+    try {
+      client.putEntitiesAsync(generateEntity("1"));
+      Thread.sleep(TIME_TO_SLEEP / 2);
+      // by the time first put response comes push 2 entities in the queue
+      client.putEntitiesAsync(generateEntity("2"));
+      client.putEntitiesAsync(generateEntity("3"));
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected");
+    }
+    for (int i = 0; i < 4; i++) {
+      if (client.getNumOfTimelineEntitiesPublished() == 2) {
+        break;
+      }
+      Thread.sleep(TIME_TO_SLEEP);
+    }
+    Assert.assertEquals("two merged TimelineEntities needs to be published", 2,
+        client.getNumOfTimelineEntitiesPublished());
+    TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
+    Assert.assertEquals(
+        "Merged TimelineEntities Object needs to 2 TimelineEntity Object", 2,
+        secondPublishedEntities.getEntities().size());
+    Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
+        secondPublishedEntities.getEntities().get(0).getId());
+    Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
+        secondPublishedEntities.getEntities().get(1).getId());
+  }
+
+  @Test
+  public void testSyncCall() throws Exception {
+    try {
+      // sync entity should not be be merged with Async
+      client.putEntities(generateEntity("1"));
+      client.putEntitiesAsync(generateEntity("2"));
+      client.putEntitiesAsync(generateEntity("3"));
+      // except for the sync call above 2 should be merged
+      client.putEntities(generateEntity("4"));
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected");
+    }
+    for (int i = 0; i < 4; i++) {
+      if (client.getNumOfTimelineEntitiesPublished() == 3) {
+        break;
+      }
+      Thread.sleep(TIME_TO_SLEEP);
+    }
+    printReceivedEntities();
+    Assert.assertEquals("TimelineEntities not published as desired", 3,
+        client.getNumOfTimelineEntitiesPublished());
+    TimelineEntities firstPublishedEntities = client.getPublishedEntities(0);
+    Assert.assertEquals("sync entities should not be merged with async", 1,
+        firstPublishedEntities.getEntities().size());
+
+    // test before pushing the sync entities asyncs are merged and pushed
+    TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
+    Assert.assertEquals(
+        "async entities should be merged before publishing sync", 2,
+        secondPublishedEntities.getEntities().size());
+    Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
+        secondPublishedEntities.getEntities().get(0).getId());
+    Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
+        secondPublishedEntities.getEntities().get(1).getId());
+
+    // test the last entity published is sync put
+    TimelineEntities thirdPublishedEntities = client.getPublishedEntities(2);
+    Assert.assertEquals("sync entities had to be published at the last", 1,
+        thirdPublishedEntities.getEntities().size());
+    Assert.assertEquals("Expected last sync Event is not proper", "4",
+        thirdPublishedEntities.getEntities().get(0).getId());
+  }
+
+  @Test
+  public void testExceptionCalls() throws Exception {
+    client.setThrowException(true);
+    try {
+      client.putEntitiesAsync(generateEntity("1"));
+    } catch (YarnException e) {
+      Assert.fail("Async calls are not expected to throw exception");
+    }
+
+    try {
+      client.putEntities(generateEntity("2"));
+      Assert.fail("Sync calls are expected to throw exception");
+    } catch (YarnException e) {
+      Assert.assertEquals("Same exception needs to be thrown",
+          "ActualException", e.getCause().getMessage());
+    }
+  }
+
+  @Test
+  public void testConfigurableNumberOfMerges() throws Exception {
+    client.setSleepBeforeReturn(true);
+    try {
+      // At max 3 entities need to be merged
+      client.putEntitiesAsync(generateEntity("1"));
+      client.putEntitiesAsync(generateEntity("2"));
+      client.putEntitiesAsync(generateEntity("3"));
+      client.putEntitiesAsync(generateEntity("4"));
+      client.putEntities(generateEntity("5"));
+      client.putEntitiesAsync(generateEntity("6"));
+      client.putEntitiesAsync(generateEntity("7"));
+      client.putEntitiesAsync(generateEntity("8"));
+      client.putEntitiesAsync(generateEntity("9"));
+      client.putEntitiesAsync(generateEntity("10"));
+    } catch (YarnException e) {
+      Assert.fail("No exception expected");
+    }
+    // not having the same logic here as it doesn't depend on how many times
+    // events are published.
+    Thread.sleep(2 * TIME_TO_SLEEP);
+    printReceivedEntities();
+    for (TimelineEntities publishedEntities : client.publishedEntities) {
+      Assert.assertTrue(
+          "Number of entities should not be greater than 3 for each publish,"
+              + " but was " + publishedEntities.getEntities().size(),
+          publishedEntities.getEntities().size() <= 3);
+    }
+  }
+
+  @Test
+  public void testAfterStop() throws Exception {
+    client.setSleepBeforeReturn(true);
+    try {
+      // At max 3 entities need to be merged
+      client.putEntities(generateEntity("1"));
+      for (int i = 2; i < 20; i++) {
+        client.putEntitiesAsync(generateEntity("" + i));
+      }
+      client.stop();
+      try {
+        client.putEntitiesAsync(generateEntity("50"));
+        Assert.fail("Exception expected");
+      } catch (YarnException e) {
+        // expected
+      }
+    } catch (YarnException e) {
+      Assert.fail("No exception expected");
+    }
+    // not having the same logic here as it doesn't depend on how many times
+    // events are published.
+    for (int i = 0; i < 5; i++) {
+      TimelineEntities publishedEntities =
+          client.publishedEntities.get(client.publishedEntities.size() - 1);
+      TimelineEntity timelineEntity = publishedEntities.getEntities()
+          .get(publishedEntities.getEntities().size() - 1);
+      if (!timelineEntity.getId().equals("19")) {
+        Thread.sleep(2 * TIME_TO_SLEEP);
+      }
+    }
+    printReceivedEntities();
+    TimelineEntities publishedEntities =
+        client.publishedEntities.get(client.publishedEntities.size() - 1);
+    TimelineEntity timelineEntity = publishedEntities.getEntities()
+        .get(publishedEntities.getEntities().size() - 1);
+    Assert.assertEquals("", "19", timelineEntity.getId());
+  }
+
+  private void printReceivedEntities() {
+    for (int i = 0; i < client.getNumOfTimelineEntitiesPublished(); i++) {
+      TimelineEntities publishedEntities = client.getPublishedEntities(i);
+      StringBuilder entitiesPerPublish = new StringBuilder();
+      ;
+      for (TimelineEntity entity : publishedEntities.getEntities()) {
+        entitiesPerPublish.append(entity.getId());
+        entitiesPerPublish.append(",");
+      }
+      LOG.info("Entities Published @ index " + i + " : "
+          + entitiesPerPublish.toString());
+    }
+  }
+
+  private static TimelineEntity generateEntity(String id) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setId(id);
+    entity.setType("testEntity");
+    entity.setCreatedTime(System.currentTimeMillis());
+    return entity;
+  }
+
+  @After
+  public void tearDown() {
+    if (client != null) {
+      client.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 544a6f5..20ca7f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -876,7 +876,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         Map<ApplicationId, String> knownCollectorsMap =
             response.getAppCollectorsMap();
         if (knownCollectorsMap == null) {
-          LOG.warn("the collectors map is null");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("No collectors to update RM");
+          }
         } else {
           Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
               knownCollectorsMap.entrySet();