You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/02/09 18:07:57 UTC
hadoop git commit: YARN-3367. Replace starting a separate thread for
post entity with event loop in TimelineClient (Naganarasimha G R via sjlee)
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 db76a3ad0 -> d491ef080
YARN-3367. Replace starting a separate thread for post entity with event loop in TimelineClient (Naganarasimha G R via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d491ef08
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d491ef08
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d491ef08
Branch: refs/heads/YARN-2928
Commit: d491ef080096c62964b8327555bf47ceae6e9292
Parents: db76a3a
Author: Sangjin Lee <sj...@apache.org>
Authored: Tue Feb 9 09:07:37 2016 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Tue Feb 9 09:07:37 2016 -0800
----------------------------------------------------------------------
.../jobhistory/JobHistoryEventHandler.java | 61 +---
.../mapred/JobHistoryFileReplayMapper.java | 8 +-
.../hadoop/mapred/TimelineEntityConverter.java | 12 +-
hadoop-yarn-project/CHANGES.txt | 3 +
.../timelineservice/TimelineEntities.java | 17 +-
.../hadoop/yarn/conf/YarnConfiguration.java | 6 +
.../distributedshell/ApplicationMaster.java | 78 +----
.../api/async/impl/AMRMClientAsyncImpl.java | 26 +-
.../hadoop/yarn/client/api/TimelineClient.java | 8 +-
.../client/api/impl/TimelineClientImpl.java | 286 ++++++++++++++---
.../src/main/resources/yarn-default.xml | 7 +
.../api/impl/TestTimelineClientV2Impl.java | 304 +++++++++++++++++++
.../nodemanager/NodeStatusUpdaterImpl.java | 4 +-
13 files changed, 623 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 6e5afb1..1c5446f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -27,10 +27,7 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -75,7 +72,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a
@@ -129,10 +125,6 @@ public class JobHistoryEventHandler extends AbstractService
private boolean timelineServiceV2Enabled = false;
- // For posting entities in new timeline service in a non-blocking way
- // TODO YARN-3367 replace with event loop in TimelineClient.
- private ExecutorService threadPool;
-
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
private static final String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE =
@@ -272,10 +264,6 @@ public class JobHistoryEventHandler extends AbstractService
YarnConfiguration.timelineServiceV2Enabled(conf);
LOG.info("Timeline service is enabled; version: " +
YarnConfiguration.getTimelineServiceVersion(conf));
- if (timelineServiceV2Enabled) {
- // initialize the thread pool for v.2 timeline service
- threadPool = createThreadPool();
- }
} else {
LOG.info("Timeline service is not enabled");
}
@@ -449,35 +437,9 @@ public class JobHistoryEventHandler extends AbstractService
if (timelineClient != null) {
timelineClient.stop();
}
- if (threadPool != null) {
- shutdownAndAwaitTermination();
- }
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
}
-
- // TODO remove threadPool after adding non-blocking call in TimelineClient
- private ExecutorService createThreadPool() {
- return Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
- .build());
- }
-
- private void shutdownAndAwaitTermination() {
- threadPool.shutdown();
- try {
- if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
- threadPool.shutdownNow();
- if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
- LOG.error("ThreadPool did not terminate");
- }
- }
- } catch (InterruptedException ie) {
- threadPool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
@@ -1072,21 +1034,6 @@ public class JobHistoryEventHandler extends AbstractService
}
}
- private void putEntityWithoutBlocking(final TimelineClient client,
- final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) {
- Runnable publishWrapper = new Runnable() {
- public void run() {
- try {
- client.putEntities(entity);
- } catch (IOException|YarnException e) {
- LOG.error("putEntityNonBlocking get failed: " + e);
- throw new RuntimeException(e.toString());
- }
- }
- };
- threadPool.execute(publishWrapper);
- }
-
// create JobEntity from HistoryEvent with adding other info, like:
// jobId, timestamp and entityType.
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
@@ -1247,7 +1194,13 @@ public class JobHistoryEventHandler extends AbstractService
taskId, setCreatedTime);
}
}
- putEntityWithoutBlocking(timelineClient, tEntity);
+ try {
+ timelineClient.putEntitiesAsync(tEntity);
+ } catch (IOException | YarnException e) {
+ LOG.error("Failed to process Event " + event.getEventType()
+ + " for the job : " + jobId, e);
+ }
+
}
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
index 802b78f..4fb5308 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -176,7 +176,7 @@ class JobHistoryFileReplayMapper extends EntityWriter {
// create entities from job history and write them
long totalTime = 0;
- Set<TimelineEntity> entitySet =
+ List<TimelineEntity> entitySet =
converter.createTimelineEntities(jobInfo, jobConf);
LOG.info("converted them into timeline entities for job " + jobIdStr);
// use the current user for this purpose
@@ -215,7 +215,7 @@ class JobHistoryFileReplayMapper extends EntityWriter {
}
private void writeAllEntities(AppLevelTimelineCollector collector,
- Set<TimelineEntity> entitySet, UserGroupInformation ugi)
+ List<TimelineEntity> entitySet, UserGroupInformation ugi)
throws IOException {
TimelineEntities entities = new TimelineEntities();
entities.setEntities(entitySet);
@@ -223,7 +223,7 @@ class JobHistoryFileReplayMapper extends EntityWriter {
}
private void writePerEntity(AppLevelTimelineCollector collector,
- Set<TimelineEntity> entitySet, UserGroupInformation ugi)
+ List<TimelineEntity> entitySet, UserGroupInformation ugi)
throws IOException {
for (TimelineEntity entity : entitySet) {
TimelineEntities entities = new TimelineEntities();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
index 880014b..0e2eb72 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.mapred;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -57,16 +59,16 @@ class TimelineEntityConverter {
* Note that we also do not add info to the YARN application entity, which
* would be needed for aggregation.
*/
- public Set<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
+ public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo,
Configuration conf) {
- Set<TimelineEntity> entities = new HashSet<>();
+ List<TimelineEntity> entities = new ArrayList<>();
// create the job entity
TimelineEntity job = createJobEntity(jobInfo, conf);
entities.add(job);
// create the task and task attempt entities
- Set<TimelineEntity> tasksAndAttempts =
+ List<TimelineEntity> tasksAndAttempts =
createTaskAndTaskAttemptEntities(jobInfo);
entities.addAll(tasksAndAttempts);
@@ -125,9 +127,9 @@ class TimelineEntityConverter {
}
}
- private Set<TimelineEntity> createTaskAndTaskAttemptEntities(
+ private List<TimelineEntity> createTaskAndTaskAttemptEntities(
JobInfo jobInfo) {
- Set<TimelineEntity> entities = new HashSet<>();
+ List<TimelineEntity> entities = new ArrayList<>();
Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks();
LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() +
" tasks");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f6bf667..4c77b67 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -167,6 +167,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-4446. Refactor reader API for better extensibility (Varun Saxena via
sjlee)
+ YARN-3367. Replace starting a separate thread for post entity with event
+ loop in TimelineClient (Naganarasimha G R via sjlee)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
index f08a0ec..63989e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
@@ -17,15 +17,16 @@
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import java.util.ArrayList;
+import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
-import java.util.HashSet;
-import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
/**
* This class hosts a set of timeline entities.
@@ -36,22 +37,22 @@ import java.util.Set;
@InterfaceStability.Unstable
public class TimelineEntities {
- private Set<TimelineEntity> entities = new HashSet<>();
+ private List<TimelineEntity> entities = new ArrayList<>();
public TimelineEntities() {
}
@XmlElement(name = "entities")
- public Set<TimelineEntity> getEntities() {
+ public List<TimelineEntity> getEntities() {
return entities;
}
- public void setEntities(Set<TimelineEntity> timelineEntities) {
+ public void setEntities(List<TimelineEntity> timelineEntities) {
this.entities = timelineEntities;
}
- public void addEntities(Set<TimelineEntity> timelineEntities) {
+ public void addEntities(List<TimelineEntity> timelineEntities) {
this.entities.addAll(timelineEntities);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9b43fbd..6ac6fb9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1762,6 +1762,12 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = 1000;
+ public static final String NUMBER_OF_ASYNC_ENTITIES_TO_MERGE =
+ TIMELINE_SERVICE_PREFIX
+ + "timeline-client.number-of-async-entities-to-merge";
+
+ public static final int DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = 10;
+
// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
@Private
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 1c68086..cb5f53b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -39,9 +39,6 @@ import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
@@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* An ApplicationMaster for executing shell commands on a set of launched
@@ -219,10 +215,6 @@ public class ApplicationMaster {
private boolean timelineServiceV2 = false;
- // For posting entities in new timeline service in a non-blocking way
- // TODO replace with event loop in TimelineClient.
- private ExecutorService threadPool;
-
// App Master configuration
// No. of containers to run shell command on
@VisibleForTesting
@@ -311,10 +303,6 @@ public class ApplicationMaster {
}
appMaster.run();
result = appMaster.finish();
-
- if (appMaster.threadPool != null) {
- appMaster.shutdownAndAwaitTermination();
- }
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
LogManager.shutdown();
@@ -329,29 +317,6 @@ public class ApplicationMaster {
}
}
- //TODO remove threadPool after adding non-blocking call in TimelineClient
- private ExecutorService createThreadPool() {
- return Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
- .build());
- }
-
- private void shutdownAndAwaitTermination() {
- threadPool.shutdown();
- try {
- // Wait a while for existing tasks to terminate
- if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
- threadPool.shutdownNow();
- if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
- LOG.error("ThreadPool did not terminate");
- }
- } catch (InterruptedException ie) {
- threadPool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
-
/**
* Dump out contents of $CWD and the environment to stdout for debugging
*/
@@ -547,11 +512,7 @@ public class ApplicationMaster {
.getOptionValue("priority", "0"));
if (YarnConfiguration.timelineServiceEnabled(conf)) {
- timelineServiceV2 =
- YarnConfiguration.timelineServiceV2Enabled(conf);
- if (timelineServiceV2) {
- threadPool = createThreadPool();
- }
+ timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
} else {
timelineClient = null;
LOG.warn("Timeline service is not enabled");
@@ -701,8 +662,10 @@ public class ApplicationMaster {
if (timelineServiceV2) {
timelineClient = TimelineClient.createTimelineClient(
appAttemptID.getApplicationId());
+ LOG.info("Timeline service V2 client is enabled");
} else {
timelineClient = TimelineClient.createTimelineClient();
+ LOG.info("Timeline service V1 client is enabled");
}
timelineClient.init(conf);
timelineClient.start();
@@ -1304,18 +1267,8 @@ public class ApplicationMaster {
shellId);
return new Thread(runnableLaunchContainer);
}
-
- private void publishContainerStartEventOnTimelineServiceV2(
- final Container container) {
- Runnable publishWrapper = new Runnable() {
- public void run() {
- publishContainerStartEventOnTimelineServiceV2Base(container);
- }
- };
- threadPool.execute(publishWrapper);
- }
- private void publishContainerStartEventOnTimelineServiceV2Base(
+ private void publishContainerStartEventOnTimelineServiceV2(
Container container) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
@@ -1349,16 +1302,6 @@ public class ApplicationMaster {
private void publishContainerEndEventOnTimelineServiceV2(
final ContainerStatus container) {
- Runnable publishWrapper = new Runnable() {
- public void run() {
- publishContainerEndEventOnTimelineServiceV2Base(container);
- }
- };
- threadPool.execute(publishWrapper);
- }
-
- private void publishContainerEndEventOnTimelineServiceV2Base(
- final ContainerStatus container) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getContainerId().toString());
@@ -1389,17 +1332,6 @@ public class ApplicationMaster {
}
private void publishApplicationAttemptEventOnTimelineServiceV2(
- final DSEvent appEvent) {
-
- Runnable publishWrapper = new Runnable() {
- public void run() {
- publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent);
- }
- };
- threadPool.execute(publishWrapper);
- }
-
- private void publishApplicationAttemptEventOnTimelineServiceV2Base(
DSEvent appEvent) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
@@ -1417,7 +1349,7 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
- timelineClient.putEntities(entity);
+ timelineClient.putEntitiesAsync(entity);
return null;
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 212f721..8af0c78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -41,9 +41,9 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -327,6 +327,19 @@ extends AMRMClientAsync<T> {
LOG.info("Interrupted while waiting for queue", ex);
continue;
}
+
+ String collectorAddress = response.getCollectorAddr();
+ TimelineClient timelineClient = client.getRegisteredTimeineClient();
+ if (timelineClient != null && collectorAddress != null
+ && !collectorAddress.isEmpty()) {
+ if (collectorAddr == null
+ || !collectorAddr.equals(collectorAddress)) {
+ collectorAddr = collectorAddress;
+ timelineClient.setTimelineServiceAddress(collectorAddress);
+ LOG.info("collectorAddress " + collectorAddress);
+ }
+ }
+
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {
handler.onNodesUpdated(updatedNodes);
@@ -354,17 +367,6 @@ extends AMRMClientAsync<T> {
if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated);
}
-
- String collectorAddress = response.getCollectorAddr();
- TimelineClient timelineClient = client.getRegisteredTimeineClient();
- if (timelineClient != null && collectorAddress != null
- && !collectorAddress.isEmpty()) {
- if (collectorAddr == null ||
- !collectorAddr.equals(collectorAddress)) {
- collectorAddr = collectorAddress;
- timelineClient.setTimelineServiceAddress(collectorAddress);
- }
- }
progress = handler.getProgress();
} catch (Throwable ex) {
handler.onError(ex);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index ade4f9a..24d9f32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -27,12 +27,12 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -53,7 +53,7 @@ public abstract class TimelineClient extends AbstractService {
* construct and initialize a timeline client if the following operations are
* supposed to be conducted by that user.
*/
- private ApplicationId contextAppId;
+ protected ApplicationId contextAppId;
/**
* Creates an instance of the timeline v.1.x client.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index a158a56..c8e6481 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -29,6 +29,14 @@ import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
@@ -129,6 +137,8 @@ public class TimelineClientImpl extends TimelineClient {
@VisibleForTesting
TimelineClientConnectionRetry connectionRetry;
+ private TimelineEntityDispatcher entityDispatcher;
+
// Abstract class for an operation that should be retried by timeline client
private static abstract class TimelineClientRetryOp {
// The operation that should be retried
@@ -312,6 +322,7 @@ public class TimelineClientImpl extends TimelineClient {
serviceRetryInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+ entityDispatcher = new TimelineEntityDispatcher(conf);
} else {
if (YarnConfiguration.useHttps(conf)) {
setTimelineServiceAddress(conf.get(
@@ -332,7 +343,9 @@ public class TimelineClientImpl extends TimelineClient {
@Override
protected void serviceStart() throws Exception {
- if (!timelineServiceV2) {
+ if (timelineServiceV2) {
+ entityDispatcher.start();
+ } else {
timelineWriter = createTimelineWriter(configuration, authUgi, client,
constructResURI(getConfig(), timelineServiceAddress, false));
}
@@ -354,6 +367,9 @@ public class TimelineClientImpl extends TimelineClient {
if (this.timelineWriter != null) {
this.timelineWriter.close();
}
+ if (timelineServiceV2) {
+ entityDispatcher.stop();
+ }
super.serviceStop();
}
@@ -366,37 +382,21 @@ public class TimelineClientImpl extends TimelineClient {
@Override
public void putEntities(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
- throws IOException, YarnException {
- putEntities(false, entities);
+ throws IOException, YarnException {
+ if (!timelineServiceV2) {
+ throw new YarnException("v.2 method is invoked on a v.1.x client");
+ }
+ entityDispatcher.dispatchEntities(true, entities);
}
@Override
public void putEntitiesAsync(
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException {
- putEntities(true, entities);
- }
-
- private void putEntities(boolean async,
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
- throws IOException, YarnException {
if (!timelineServiceV2) {
throw new YarnException("v.2 method is invoked on a v.1.x client");
}
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
- entitiesContainer =
- new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
- for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) {
- entitiesContainer.addEntity(entity);
- }
- MultivaluedMap<String, String> params = new MultivaluedMapImpl();
- if (getContextAppId() != null) {
- params.add("appid", getContextAppId().toString());
- }
- if (async) {
- params.add("async", Boolean.TRUE.toString());
- }
- putObjects("entities", params, entitiesContainer);
+ entityDispatcher.dispatchEntities(false, entities);
}
@Override
@@ -407,20 +407,10 @@ public class TimelineClientImpl extends TimelineClient {
// Used for new timeline service only
@Private
- public void putObjects(String path, MultivaluedMap<String, String> params,
+ protected void putObjects(String path, MultivaluedMap<String, String> params,
Object obj) throws IOException, YarnException {
- // timelineServiceAddress could haven't be initialized yet
- // or stale (only for new timeline service)
- int retries = pollTimelineServiceAddress(this.maxServiceRetries);
- if (timelineServiceAddress == null) {
- String errMessage = "TimelineClient has reached to max retry times : "
- + this.maxServiceRetries
- + ", but failed to fetch timeline service address. Please verify"
- + " Timeline Auxillary Service is configured in all the NMs";
- LOG.error(errMessage);
- throw new YarnException(errMessage);
- }
+ int retries = verifyRestEndPointAvailable();
// timelineServiceAddress could be stale, add retry logic here.
boolean needRetry = true;
@@ -438,6 +428,21 @@ public class TimelineClientImpl extends TimelineClient {
}
}
+ private int verifyRestEndPointAvailable() throws YarnException {
+ // timelineServiceAddress could haven't be initialized yet
+ // or stale (only for new timeline service)
+ int retries = pollTimelineServiceAddress(this.maxServiceRetries);
+ if (timelineServiceAddress == null) {
+ String errMessage = "TimelineClient has reached to max retry times : "
+ + this.maxServiceRetries
+ + ", but failed to fetch timeline service address. Please verify"
+ + " Timeline Auxillary Service is configured in all the NMs";
+ LOG.error(errMessage);
+ throw new YarnException(errMessage);
+ }
+ return retries;
+ }
+
/**
* Check if reaching to maximum of retries.
* @param retries
@@ -643,7 +648,7 @@ public class TimelineClientImpl extends TimelineClient {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- timelineServiceAddress = getTimelineServiceAddress();
+ // timelineServiceAddress = getTimelineServiceAddress();
retries--;
}
return retries;
@@ -862,4 +867,213 @@ public class TimelineClientImpl extends TimelineClient {
public void setTimelineWriter(TimelineWriter writer) {
this.timelineWriter = writer;
}
+
+ private final class EntitiesHolder extends FutureTask<Void> {
+ private final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities;
+ private final boolean isSync;
+
+ EntitiesHolder(
+ final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities,
+ final boolean isSync) {
+ super(new Callable<Void>() {
+ // publishEntities()
+ public Void call() throws Exception {
+ MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+ params.add("appid", contextAppId.toString());
+ params.add("async", Boolean.toString(!isSync));
+ putObjects("entities", params, entities);
+ return null;
+ }
+ });
+ this.entities = entities;
+ this.isSync = isSync;
+ }
+
+ public boolean isSync() {
+ return isSync;
+ }
+
+ public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities getEntities() {
+ return entities;
+ }
+ }
+
+ /**
+ * This class is responsible for collecting the timeline entities and
+ * publishing them in async.
+ */
+ private class TimelineEntityDispatcher {
+ /**
+ * Time period for which the timelineclient will wait for draining after
+ * stop
+ */
+ private static final long DRAIN_TIME_PERIOD = 2000L;
+
+ private int numberOfAsyncsToMerge;
+ private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
+ private ExecutorService executor;
+
+ TimelineEntityDispatcher(Configuration conf) {
+ timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
+ numberOfAsyncsToMerge =
+ conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
+ YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
+ }
+
+ Runnable createRunnable() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ EntitiesHolder entitiesHolder;
+ while (!Thread.currentThread().isInterrupted()) {
+ // Merge all the async calls and make one push, but if its sync
+ // call push immediately
+ try {
+ entitiesHolder = timelineEntityQueue.take();
+ } catch (InterruptedException ie) {
+ LOG.info("Timeline dispatcher thread was interrupted ");
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (entitiesHolder != null) {
+ publishWithoutBlockingOnQueue(entitiesHolder);
+ }
+ }
+ } finally {
+ if (!timelineEntityQueue.isEmpty()) {
+ LOG.info("Yet to publish " + timelineEntityQueue.size()
+ + " timelineEntities, draining them now. ");
+ }
+ // Try to drain the remaining entities to be published @ the max for
+ // 2 seconds
+ long timeTillweDrain =
+ System.currentTimeMillis() + DRAIN_TIME_PERIOD;
+ while (!timelineEntityQueue.isEmpty()) {
+ publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
+ if (System.currentTimeMillis() > timeTillweDrain) {
+ // time elapsed stop publishing further....
+ if (!timelineEntityQueue.isEmpty()) {
+ LOG.warn("Time to drain elapsed! Remaining "
+ + timelineEntityQueue.size() + "timelineEntities will not"
+ + " be published");
+ // if some entities were not drained then we need interrupt
+ // the threads which had put sync EntityHolders to the queue.
+ EntitiesHolder nextEntityInTheQueue = null;
+ while ((nextEntityInTheQueue =
+ timelineEntityQueue.poll()) != null) {
+ nextEntityInTheQueue.cancel(true);
+ }
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Publishes the given EntitiesHolder and return immediately if sync
+ * call, else tries to fetch the EntitiesHolder from the queue in non
+ * blocking fashion and collate the Entities if possible before
+ * publishing through REST.
+ *
+ * @param entitiesHolder
+ */
+ private void publishWithoutBlockingOnQueue(
+ EntitiesHolder entitiesHolder) {
+ if (entitiesHolder.isSync()) {
+ entitiesHolder.run();
+ return;
+ }
+ int count = 1;
+ while (true) {
+ // loop till we find a sync put Entities or there is nothing
+ // to take
+ EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
+ if (nextEntityInTheQueue == null) {
+ // Nothing in the queue just publish and get back to the
+ // blocked wait state
+ entitiesHolder.run();
+ break;
+ } else if (nextEntityInTheQueue.isSync()) {
+ // flush all the prev async entities first
+ entitiesHolder.run();
+ // and then flush the sync entity
+ nextEntityInTheQueue.run();
+ break;
+ } else {
+ // append all async entities together and then flush
+ entitiesHolder.getEntities().addEntities(
+ nextEntityInTheQueue.getEntities().getEntities());
+ count++;
+ if (count == numberOfAsyncsToMerge) {
+ // Flush the entities if the number of the async
+ // putEntites merged reaches the desired limit. To avoid
+ // collecting multiple entities and delaying for a long
+ // time.
+ entitiesHolder.run();
+ break;
+ }
+ }
+ }
+ }
+ };
+ }
+
+ public void dispatchEntities(boolean sync,
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] entitiesTobePublished)
+ throws YarnException {
+ if (executor.isShutdown()) {
+ throw new YarnException("Timeline client is in the process of stopping,"
+ + " not accepting any more TimelineEntities");
+ }
+
+ // wrap all TimelineEntity into TimelineEntities object
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities =
+ new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
+ for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entitiesTobePublished) {
+ entities.addEntity(entity);
+ }
+
+ // created a holder and place it in queue
+ EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
+ try {
+ timelineEntityQueue.put(entitiesHolder);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new YarnException(
+ "Failed while adding entity to the queue for publishing", e);
+ }
+
+ if (sync) {
+ // In sync call we need to wait till its published and if any error then
+ // throw it back
+ try {
+ entitiesHolder.get();
+ } catch (ExecutionException e) {
+ throw new YarnException("Failed while publishing entity",
+ e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new YarnException("Interrupted while publishing entity", e);
+ }
+ }
+ }
+
+ public void start() {
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(createRunnable());
+ }
+
+ public void stop() {
+ LOG.info("Stopping TimelineClient.");
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index b521599..2cbc836 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2081,6 +2081,13 @@
<value>1000</value>
</property>
+ <property>
+ <description>Time line V2 client tries to merge these many number of
+ async entities (if available) and then call the REST ATS V2 API to submit.
+ </description>
+ <name>yarn.timeline-service.timeline-client.number-of-async-entities-to-merge</name>
+ <value>10</value>
+ </property>
<!-- Shared Cache Configuration -->
<property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
new file mode 100644
index 0000000..7803f94
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTimelineClientV2Impl {
+ private static final Log LOG =
+ LogFactory.getLog(TestTimelineClientV2Impl.class);
+ private TestV2TimelineClient client;
+ private static long TIME_TO_SLEEP = 150;
+
+ @Before
+ public void setup() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+ conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
+ client = createTimelineClient(conf);
+ }
+
+ private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) {
+ ApplicationId id = ApplicationId.newInstance(0, 0);
+ TestV2TimelineClient client = new TestV2TimelineClient(id);
+ client.init(conf);
+ client.start();
+ return client;
+ }
+
+ private class TestV2TimelineClient extends TimelineClientImpl {
+ private boolean sleepBeforeReturn;
+ private boolean throwException;
+
+ private List<TimelineEntities> publishedEntities;
+
+ public TimelineEntities getPublishedEntities(int putIndex) {
+ Assert.assertTrue("Not So many entities Published",
+ putIndex < publishedEntities.size());
+ return publishedEntities.get(putIndex);
+ }
+
+ public void setSleepBeforeReturn(boolean sleepBeforeReturn) {
+ this.sleepBeforeReturn = sleepBeforeReturn;
+ }
+
+ public void setThrowException(boolean throwException) {
+ this.throwException = throwException;
+ }
+
+ public int getNumOfTimelineEntitiesPublished() {
+ return publishedEntities.size();
+ }
+
+ public TestV2TimelineClient(ApplicationId id) {
+ super(id);
+ publishedEntities = new ArrayList<TimelineEntities>();
+ }
+
+ protected void putObjects(String path,
+ MultivaluedMap<String, String> params, Object obj)
+ throws IOException, YarnException {
+ if (throwException) {
+ throw new YarnException("ActualException");
+ }
+ publishedEntities.add((TimelineEntities) obj);
+ if (sleepBeforeReturn) {
+ try {
+ Thread.sleep(TIME_TO_SLEEP);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testPostEntities() throws Exception {
+ try {
+ client.putEntities(generateEntity("1"));
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected");
+ }
+ }
+
+ @Test
+ public void testASyncCallMerge() throws Exception {
+ client.setSleepBeforeReturn(true);
+ try {
+ client.putEntitiesAsync(generateEntity("1"));
+ Thread.sleep(TIME_TO_SLEEP / 2);
+ // by the time first put response comes push 2 entities in the queue
+ client.putEntitiesAsync(generateEntity("2"));
+ client.putEntitiesAsync(generateEntity("3"));
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected");
+ }
+ for (int i = 0; i < 4; i++) {
+ if (client.getNumOfTimelineEntitiesPublished() == 2) {
+ break;
+ }
+ Thread.sleep(TIME_TO_SLEEP);
+ }
+ Assert.assertEquals("two merged TimelineEntities needs to be published", 2,
+ client.getNumOfTimelineEntitiesPublished());
+ TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
+ Assert.assertEquals(
+ "Merged TimelineEntities Object needs to 2 TimelineEntity Object", 2,
+ secondPublishedEntities.getEntities().size());
+ Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
+ secondPublishedEntities.getEntities().get(0).getId());
+ Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
+ secondPublishedEntities.getEntities().get(1).getId());
+ }
+
+ @Test
+ public void testSyncCall() throws Exception {
+ try {
+ // sync entity should not be be merged with Async
+ client.putEntities(generateEntity("1"));
+ client.putEntitiesAsync(generateEntity("2"));
+ client.putEntitiesAsync(generateEntity("3"));
+ // except for the sync call above 2 should be merged
+ client.putEntities(generateEntity("4"));
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected");
+ }
+ for (int i = 0; i < 4; i++) {
+ if (client.getNumOfTimelineEntitiesPublished() == 3) {
+ break;
+ }
+ Thread.sleep(TIME_TO_SLEEP);
+ }
+ printReceivedEntities();
+ Assert.assertEquals("TimelineEntities not published as desired", 3,
+ client.getNumOfTimelineEntitiesPublished());
+ TimelineEntities firstPublishedEntities = client.getPublishedEntities(0);
+ Assert.assertEquals("sync entities should not be merged with async", 1,
+ firstPublishedEntities.getEntities().size());
+
+ // test before pushing the sync entities asyncs are merged and pushed
+ TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
+ Assert.assertEquals(
+ "async entities should be merged before publishing sync", 2,
+ secondPublishedEntities.getEntities().size());
+ Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
+ secondPublishedEntities.getEntities().get(0).getId());
+ Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
+ secondPublishedEntities.getEntities().get(1).getId());
+
+ // test the last entity published is sync put
+ TimelineEntities thirdPublishedEntities = client.getPublishedEntities(2);
+ Assert.assertEquals("sync entities had to be published at the last", 1,
+ thirdPublishedEntities.getEntities().size());
+ Assert.assertEquals("Expected last sync Event is not proper", "4",
+ thirdPublishedEntities.getEntities().get(0).getId());
+ }
+
+ @Test
+ public void testExceptionCalls() throws Exception {
+ client.setThrowException(true);
+ try {
+ client.putEntitiesAsync(generateEntity("1"));
+ } catch (YarnException e) {
+ Assert.fail("Async calls are not expected to throw exception");
+ }
+
+ try {
+ client.putEntities(generateEntity("2"));
+ Assert.fail("Sync calls are expected to throw exception");
+ } catch (YarnException e) {
+ Assert.assertEquals("Same exception needs to be thrown",
+ "ActualException", e.getCause().getMessage());
+ }
+ }
+
+ @Test
+ public void testConfigurableNumberOfMerges() throws Exception {
+ client.setSleepBeforeReturn(true);
+ try {
+ // At max 3 entities need to be merged
+ client.putEntitiesAsync(generateEntity("1"));
+ client.putEntitiesAsync(generateEntity("2"));
+ client.putEntitiesAsync(generateEntity("3"));
+ client.putEntitiesAsync(generateEntity("4"));
+ client.putEntities(generateEntity("5"));
+ client.putEntitiesAsync(generateEntity("6"));
+ client.putEntitiesAsync(generateEntity("7"));
+ client.putEntitiesAsync(generateEntity("8"));
+ client.putEntitiesAsync(generateEntity("9"));
+ client.putEntitiesAsync(generateEntity("10"));
+ } catch (YarnException e) {
+ Assert.fail("No exception expected");
+ }
+ // not having the same logic here as it doesn't depend on how many times
+ // events are published.
+ Thread.sleep(2 * TIME_TO_SLEEP);
+ printReceivedEntities();
+ for (TimelineEntities publishedEntities : client.publishedEntities) {
+ Assert.assertTrue(
+ "Number of entities should not be greater than 3 for each publish,"
+ + " but was " + publishedEntities.getEntities().size(),
+ publishedEntities.getEntities().size() <= 3);
+ }
+ }
+
+ @Test
+ public void testAfterStop() throws Exception {
+ client.setSleepBeforeReturn(true);
+ try {
+ // At max 3 entities need to be merged
+ client.putEntities(generateEntity("1"));
+ for (int i = 2; i < 20; i++) {
+ client.putEntitiesAsync(generateEntity("" + i));
+ }
+ client.stop();
+ try {
+ client.putEntitiesAsync(generateEntity("50"));
+ Assert.fail("Exception expected");
+ } catch (YarnException e) {
+ // expected
+ }
+ } catch (YarnException e) {
+ Assert.fail("No exception expected");
+ }
+ // not having the same logic here as it doesn't depend on how many times
+ // events are published.
+ for (int i = 0; i < 5; i++) {
+ TimelineEntities publishedEntities =
+ client.publishedEntities.get(client.publishedEntities.size() - 1);
+ TimelineEntity timelineEntity = publishedEntities.getEntities()
+ .get(publishedEntities.getEntities().size() - 1);
+ if (!timelineEntity.getId().equals("19")) {
+ Thread.sleep(2 * TIME_TO_SLEEP);
+ }
+ }
+ printReceivedEntities();
+ TimelineEntities publishedEntities =
+ client.publishedEntities.get(client.publishedEntities.size() - 1);
+ TimelineEntity timelineEntity = publishedEntities.getEntities()
+ .get(publishedEntities.getEntities().size() - 1);
+ Assert.assertEquals("", "19", timelineEntity.getId());
+ }
+
+ private void printReceivedEntities() {
+ for (int i = 0; i < client.getNumOfTimelineEntitiesPublished(); i++) {
+ TimelineEntities publishedEntities = client.getPublishedEntities(i);
+ StringBuilder entitiesPerPublish = new StringBuilder();
+ ;
+ for (TimelineEntity entity : publishedEntities.getEntities()) {
+ entitiesPerPublish.append(entity.getId());
+ entitiesPerPublish.append(",");
+ }
+ LOG.info("Entities Published @ index " + i + " : "
+ + entitiesPerPublish.toString());
+ }
+ }
+
+ private static TimelineEntity generateEntity(String id) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setId(id);
+ entity.setType("testEntity");
+ entity.setCreatedTime(System.currentTimeMillis());
+ return entity;
+ }
+
+ @After
+ public void tearDown() {
+ if (client != null) {
+ client.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 544a6f5..20ca7f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -876,7 +876,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
Map<ApplicationId, String> knownCollectorsMap =
response.getAppCollectorsMap();
if (knownCollectorsMap == null) {
- LOG.warn("the collectors map is null");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No collectors to update RM");
+ }
} else {
Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
knownCollectorsMap.entrySet();