You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2017/08/03 06:58:17 UTC

[40/50] [abbrv] hadoop git commit: YARN-6888. Refactor AppLevelTimelineCollector such that RM does not have aggregator threads created. Contributed by Vrushali C.

YARN-6888. Refactor AppLevelTimelineCollector such that RM does not have aggregator threads created. Contributed by Vrushali C.

(cherry picked from commit 20dd6d18b7787e67ef96f3b6b92ea8415a8650fd)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bb4f4403
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bb4f4403
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bb4f4403

Branch: refs/heads/YARN-5355_branch2
Commit: bb4f44038c6244116d27e0b0c43f8fd79d0063e7
Parents: 1ee2a98
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Fri Jul 28 11:47:16 2017 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Fri Jul 28 11:51:23 2017 +0530

----------------------------------------------------------------------
 .../collector/AppLevelTimelineCollector.java    |  87 +----------
 .../AppLevelTimelineCollectorWithAgg.java       | 150 +++++++++++++++++++
 .../PerNodeTimelineCollectorsAuxService.java    |   2 +-
 .../TestNMTimelineCollectorManager.java         |   4 +-
 4 files changed, 155 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb4f4403/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
index 0b05309..c481dbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -26,19 +25,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import com.google.common.base.Preconditions;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Service that handles writes to the timeline service and writes them to the
  * backing storage for a given YARN application.
@@ -50,15 +40,8 @@ import java.util.concurrent.TimeUnit;
 public class AppLevelTimelineCollector extends TimelineCollector {
   private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
 
-  private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
-  private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
-  private static Set<String> entityTypesSkipAggregation
-      = initializeSkipSet();
-
   private final ApplicationId appId;
   private final TimelineCollectorContext context;
-  private ScheduledThreadPoolExecutor appAggregationExecutor;
-  private AppLevelAggregator appAggregator;
   private UserGroupInformation currentUser;
 
   public AppLevelTimelineCollector(ApplicationId appId) {
@@ -68,12 +51,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
     context = new TimelineCollectorContext();
   }
 
-  private static Set<String> initializeSkipSet() {
-    Set<String> result = new HashSet<>();
-    result.add(TimelineEntityType.YARN_APPLICATION.toString());
-    result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
-    result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
-    return result;
+  public UserGroupInformation getCurrentUser() {
+    return currentUser;
   }
 
   @Override
@@ -91,29 +70,11 @@ public class AppLevelTimelineCollector extends TimelineCollector {
 
   @Override
   protected void serviceStart() throws Exception {
-    // Launch the aggregation thread
-    appAggregationExecutor = new ScheduledThreadPoolExecutor(
-        AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS,
-        new ThreadFactoryBuilder()
-            .setNameFormat("TimelineCollector Aggregation thread #%d")
-            .build());
-    appAggregator = new AppLevelAggregator();
-    appAggregationExecutor.scheduleAtFixedRate(appAggregator,
-        AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
-        AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
-        TimeUnit.SECONDS);
     super.serviceStart();
   }
 
   @Override
   protected void serviceStop() throws Exception {
-    appAggregationExecutor.shutdown();
-    if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
-      LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
-      appAggregationExecutor.shutdownNow();
-    }
-    // Perform one round of aggregation after the aggregation executor is done.
-    appAggregator.aggregate();
     super.serviceStop();
   }
 
@@ -122,48 +83,4 @@ public class AppLevelTimelineCollector extends TimelineCollector {
     return context;
   }
 
-  @Override
-  protected Set<String> getEntityTypesSkipAggregation() {
-    return entityTypesSkipAggregation;
-  }
-
-  private class AppLevelAggregator implements Runnable {
-
-    private void aggregate() {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("App-level real-time aggregating");
-      }
-      if (!isReadyToAggregate()) {
-        LOG.warn("App-level collector is not ready, skip aggregation. ");
-        return;
-      }
-      try {
-        TimelineCollectorContext currContext = getTimelineEntityContext();
-        Map<String, AggregationStatusTable> aggregationGroups
-            = getAggregationGroups();
-        if (aggregationGroups == null
-            || aggregationGroups.isEmpty()) {
-          LOG.debug("App-level collector is empty, skip aggregation. ");
-          return;
-        }
-        TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
-            aggregationGroups, currContext.getAppId(),
-            TimelineEntityType.YARN_APPLICATION.toString());
-        TimelineEntities entities = new TimelineEntities();
-        entities.addEntity(resultEntity);
-        putEntitiesAsync(entities, currentUser);
-      } catch (Exception e) {
-        LOG.error("Error aggregating timeline metrics", e);
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("App-level real-time aggregation complete");
-      }
-    }
-
-    @Override
-    public void run() {
-      aggregate();
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb4f4403/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java
new file mode 100644
index 0000000..ac91275
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service that handles aggregations for applications
+ * and makes use of {@link AppLevelTimelineCollector} class for
+ * writes to Timeline Service.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelTimelineCollectorWithAgg
+    extends AppLevelTimelineCollector {
+  private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
+
+  private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
+  private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
+  private static Set<String> entityTypesSkipAggregation
+      = initializeSkipSet();
+
+  private ScheduledThreadPoolExecutor appAggregationExecutor;
+  private AppLevelAggregator appAggregator;
+
+  public AppLevelTimelineCollectorWithAgg(ApplicationId appId) {
+    super(appId);
+  }
+
+  private static Set<String> initializeSkipSet() {
+    Set<String> result = new HashSet<>();
+    result.add(TimelineEntityType.YARN_APPLICATION.toString());
+    result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
+    result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
+    return result;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    // Launch the aggregation thread
+    appAggregationExecutor = new ScheduledThreadPoolExecutor(
+        AppLevelTimelineCollectorWithAgg.AGGREGATION_EXECUTOR_NUM_THREADS,
+        new ThreadFactoryBuilder()
+            .setNameFormat("TimelineCollector Aggregation thread #%d")
+            .build());
+    appAggregator = new AppLevelAggregator();
+    appAggregationExecutor.scheduleAtFixedRate(appAggregator,
+        AppLevelTimelineCollectorWithAgg.
+          AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
+        AppLevelTimelineCollectorWithAgg.
+          AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
+        TimeUnit.SECONDS);
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    appAggregationExecutor.shutdown();
+    if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+      LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
+      appAggregationExecutor.shutdownNow();
+    }
+    // Perform one round of aggregation after the aggregation executor is done.
+    appAggregator.aggregate();
+    super.serviceStop();
+  }
+
+  @Override
+  protected Set<String> getEntityTypesSkipAggregation() {
+    return entityTypesSkipAggregation;
+  }
+
+  private class AppLevelAggregator implements Runnable {
+
+    private void aggregate() {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("App-level real-time aggregating");
+      }
+      if (!isReadyToAggregate()) {
+        LOG.warn("App-level collector is not ready, skip aggregation. ");
+        return;
+      }
+      try {
+        TimelineCollectorContext currContext = getTimelineEntityContext();
+        Map<String, AggregationStatusTable> aggregationGroups
+            = getAggregationGroups();
+        if (aggregationGroups == null
+            || aggregationGroups.isEmpty()) {
+          LOG.debug("App-level collector is empty, skip aggregation. ");
+          return;
+        }
+        TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
+            aggregationGroups, currContext.getAppId(),
+            TimelineEntityType.YARN_APPLICATION.toString());
+        TimelineEntities entities = new TimelineEntities();
+        entities.addEntity(resultEntity);
+        putEntitiesAsync(entities, getCurrentUser());
+      } catch (Exception e) {
+        LOG.error("Error aggregating timeline metrics", e);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("App-level real-time aggregation complete");
+      }
+    }
+
+    @Override
+    public void run() {
+      aggregate();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb4f4403/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
index 266bd04..93e5666 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
@@ -118,7 +118,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
    */
   public boolean addApplication(ApplicationId appId) {
     AppLevelTimelineCollector collector =
-        new AppLevelTimelineCollector(appId);
+        new AppLevelTimelineCollectorWithAgg(appId);
     return (collectorManager.putIfAbsent(appId, collector)
         == collector);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb4f4403/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
index 7bc89c5..a59f8c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
@@ -95,7 +95,7 @@ public class TestNMTimelineCollectorManager {
       Callable<Boolean> task = new Callable<Boolean>() {
         public Boolean call() {
           AppLevelTimelineCollector collector =
-              new AppLevelTimelineCollector(appId);
+              new AppLevelTimelineCollectorWithAgg(appId);
           return (collectorManager.putIfAbsent(appId, collector) == collector);
         }
       };
@@ -126,7 +126,7 @@ public class TestNMTimelineCollectorManager {
       Callable<Boolean> task = new Callable<Boolean>() {
         public Boolean call() {
           AppLevelTimelineCollector collector =
-              new AppLevelTimelineCollector(appId);
+              new AppLevelTimelineCollectorWithAgg(appId);
           boolean successPut =
               (collectorManager.putIfAbsent(appId, collector) == collector);
           return successPut && collectorManager.remove(appId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org