You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:49:15 UTC

[29/50] [abbrv] hadoop git commit: YARN-5018. Online aggregation logic should not run immediately after collectors got started (Li Lu via sjlee)

YARN-5018. Online aggregation logic should not run immediately after collectors got started (Li Lu via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bab078d7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bab078d7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bab078d7

Branch: refs/heads/YARN-2928
Commit: bab078d70091057a2fc7b8873e183d43365503f5
Parents: ef12e0e
Author: Sangjin Lee <sj...@apache.org>
Authored: Tue May 24 11:02:56 2016 -0700
Committer: Vrushali <vr...@twitter.com>
Committed: Sun Jun 19 00:20:10 2016 -0700

----------------------------------------------------------------------
 .../RMTimelineCollectorManager.java               |  2 +-
 .../collector/AppLevelTimelineCollector.java      | 17 +++++++++++++++--
 .../collector/NodeTimelineCollectorManager.java   |  2 +-
 .../collector/TimelineCollector.java              | 12 +++++++++++-
 .../collector/TimelineCollectorManager.java       | 18 +++++++++++++++++-
 5 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab078d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
index a4f1084..64c3749 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
@@ -49,7 +49,7 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager {
   }
 
   @Override
-  public void postPut(ApplicationId appId, TimelineCollector collector) {
+  protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
     RMApp app = rmContext.getRMApps().get(appId);
     if (app == null) {
       throw new YarnRuntimeException(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab078d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
index eb05262..d276269 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import com.google.common.base.Preconditions;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -93,7 +94,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
         new ThreadFactoryBuilder()
             .setNameFormat("TimelineCollector Aggregation thread #%d")
             .build());
-    appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), 0,
+    appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(),
+        AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
         AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
         TimeUnit.SECONDS);
     super.serviceStart();
@@ -126,10 +128,21 @@ public class AppLevelTimelineCollector extends TimelineCollector {
       if (LOG.isDebugEnabled()) {
         LOG.debug("App-level real-time aggregating");
       }
+      if (!isReadyToAggregate()) {
+        LOG.warn("App-level collector is not ready, skip aggregation. ");
+        return;
+      }
       try {
         TimelineCollectorContext currContext = getTimelineEntityContext();
+        Map<String, AggregationStatusTable> aggregationGroups
+            = getAggregationGroups();
+        if (aggregationGroups == null
+            || aggregationGroups.isEmpty()) {
+          LOG.debug("App-level collector is empty, skip aggregation. ");
+          return;
+        }
         TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
-            getAggregationGroups(), currContext.getAppId(),
+            aggregationGroups, currContext.getAppId(),
             TimelineEntityType.YARN_APPLICATION.toString());
         TimelineEntities entities = new TimelineEntities();
         entities.addEntity(resultEntity);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab078d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
index 75557a8..0323d7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
@@ -87,7 +87,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
   }
 
   @Override
-  public void postPut(ApplicationId appId, TimelineCollector collector) {
+  protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
     try {
       // Get context info from NM
       updateTimelineCollectorContext(appId, collector);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab078d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 8cd645c..2fc3033 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -60,6 +60,8 @@ public abstract class TimelineCollector extends CompositeService {
   private static Set<String> entityTypesSkipAggregation
       = new HashSet<>();
 
+  private volatile boolean readyToAggregate = false;
+
   public TimelineCollector(String name) {
     super(name);
   }
@@ -91,6 +93,14 @@ public abstract class TimelineCollector extends CompositeService {
     return aggregationGroups;
   }
 
+  protected void setReadyToAggregate() {
+    readyToAggregate = true;
+  }
+
+  protected boolean isReadyToAggregate() {
+    return readyToAggregate;
+  }
+
   /**
    * Method to decide the set of timeline entity types the collector should
    * skip on aggregations. Subclasses may want to override this method to
@@ -258,7 +268,7 @@ public abstract class TimelineCollector extends CompositeService {
 
   // Note: In memory aggregation is performed in an eventually consistent
   // fashion.
-  private static class AggregationStatusTable {
+  protected static class AggregationStatusTable {
     // On aggregation, for each metric, aggregate all per-entity accumulated
     // metrics. We only use the id and type for TimelineMetrics in the key set
     // of this table.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab078d7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 8f74ffb..a8f88e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -136,8 +136,24 @@ public class TimelineCollectorManager extends AbstractService {
     return collectorInTable;
   }
 
-  protected void postPut(ApplicationId appId, TimelineCollector collector) {
+  /**
+   * Callback handler for the timeline collector manager when a collector has
+   * been added into the collector map.
+   * @param appId Application id of the collector.
+   * @param collector The actual timeline collector that has been added.
+   */
+  public void postPut(ApplicationId appId, TimelineCollector collector) {
+    doPostPut(appId, collector);
+    collector.setReadyToAggregate();
+  }
 
+  /**
+   * A template method that will be called by
+   * {@link  #postPut(ApplicationId, TimelineCollector)}.
+   * @param appId Application id of the collector.
+   * @param collector The actual timeline collector that has been added.
+   */
+  protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org