You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:49:15 UTC
[29/50] [abbrv] hadoop git commit: YARN-5018. Online aggregation
logic should not run immediately after collectors got started (Li Lu via
sjlee)
YARN-5018. Online aggregation logic should not run immediately after collectors got started (Li Lu via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bab078d7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bab078d7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bab078d7
Branch: refs/heads/YARN-2928
Commit: bab078d70091057a2fc7b8873e183d43365503f5
Parents: ef12e0e
Author: Sangjin Lee <sj...@apache.org>
Authored: Tue May 24 11:02:56 2016 -0700
Committer: Vrushali <vr...@twitter.com>
Committed: Sun Jun 19 00:20:10 2016 -0700
----------------------------------------------------------------------
.../RMTimelineCollectorManager.java | 2 +-
.../collector/AppLevelTimelineCollector.java | 17 +++++++++++++++--
.../collector/NodeTimelineCollectorManager.java | 2 +-
.../collector/TimelineCollector.java | 12 +++++++++++-
.../collector/TimelineCollectorManager.java | 18 +++++++++++++++++-
5 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab078d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
index a4f1084..64c3749 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
@@ -49,7 +49,7 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager {
}
@Override
- public void postPut(ApplicationId appId, TimelineCollector collector) {
+ protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
RMApp app = rmContext.getRMApps().get(appId);
if (app == null) {
throw new YarnRuntimeException(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab078d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
index eb05262..d276269 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.base.Preconditions;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -93,7 +94,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
new ThreadFactoryBuilder()
.setNameFormat("TimelineCollector Aggregation thread #%d")
.build());
- appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), 0,
+ appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(),
+ AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
TimeUnit.SECONDS);
super.serviceStart();
@@ -126,10 +128,21 @@ public class AppLevelTimelineCollector extends TimelineCollector {
if (LOG.isDebugEnabled()) {
LOG.debug("App-level real-time aggregating");
}
+ if (!isReadyToAggregate()) {
+ LOG.warn("App-level collector is not ready, skip aggregation. ");
+ return;
+ }
try {
TimelineCollectorContext currContext = getTimelineEntityContext();
+ Map<String, AggregationStatusTable> aggregationGroups
+ = getAggregationGroups();
+ if (aggregationGroups == null
+ || aggregationGroups.isEmpty()) {
+ LOG.debug("App-level collector is empty, skip aggregation. ");
+ return;
+ }
TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
- getAggregationGroups(), currContext.getAppId(),
+ aggregationGroups, currContext.getAppId(),
TimelineEntityType.YARN_APPLICATION.toString());
TimelineEntities entities = new TimelineEntities();
entities.addEntity(resultEntity);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab078d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
index 75557a8..0323d7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
@@ -87,7 +87,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
}
@Override
- public void postPut(ApplicationId appId, TimelineCollector collector) {
+ protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
try {
// Get context info from NM
updateTimelineCollectorContext(appId, collector);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab078d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 8cd645c..2fc3033 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -60,6 +60,8 @@ public abstract class TimelineCollector extends CompositeService {
private static Set<String> entityTypesSkipAggregation
= new HashSet<>();
+ private volatile boolean readyToAggregate = false;
+
public TimelineCollector(String name) {
super(name);
}
@@ -91,6 +93,14 @@ public abstract class TimelineCollector extends CompositeService {
return aggregationGroups;
}
+ protected void setReadyToAggregate() {
+ readyToAggregate = true;
+ }
+
+ protected boolean isReadyToAggregate() {
+ return readyToAggregate;
+ }
+
/**
* Method to decide the set of timeline entity types the collector should
* skip on aggregations. Subclasses may want to override this method to
@@ -258,7 +268,7 @@ public abstract class TimelineCollector extends CompositeService {
// Note: In memory aggregation is performed in an eventually consistent
// fashion.
- private static class AggregationStatusTable {
+ protected static class AggregationStatusTable {
// On aggregation, for each metric, aggregate all per-entity accumulated
// metrics. We only use the id and type for TimelineMetrics in the key set
// of this table.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab078d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 8f74ffb..a8f88e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -136,8 +136,24 @@ public class TimelineCollectorManager extends AbstractService {
return collectorInTable;
}
- protected void postPut(ApplicationId appId, TimelineCollector collector) {
+ /**
+ * Callback handler for the timeline collector manager when a collector has
+ * been added into the collector map.
+ * @param appId Application id of the collector.
+ * @param collector The actual timeline collector that has been added.
+ */
+ public void postPut(ApplicationId appId, TimelineCollector collector) {
+ doPostPut(appId, collector);
+ collector.setReadyToAggregate();
+ }
+ /**
+ * A template method that will be called by
+ * {@link #postPut(ApplicationId, TimelineCollector)}.
+ * @param appId Application id of the collector.
+ * @param collector The actual timeline collector that has been added.
+ */
+ protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org