You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/03/24 21:26:05 UTC
hadoop git commit: YARN-3034. Implement RM starting its timeline
collector. Contributed by Naganarasimha G R
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 04de2cecc -> dc12cad2b
YARN-3034. Implement RM starting its timeline collector. Contributed by Naganarasimha G R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dc12cad2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dc12cad2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dc12cad2
Branch: refs/heads/YARN-2928
Commit: dc12cad2b89f643dafa0def863325cb374c7670c
Parents: 04de2ce
Author: Junping Du <ju...@apache.org>
Authored: Tue Mar 24 13:42:14 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Tue Mar 24 13:42:14 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 14 ++-
.../src/main/resources/yarn-default.xml | 11 +-
.../hadoop-yarn-server-resourcemanager/pom.xml | 4 +
.../resourcemanager/RMActiveServiceContext.java | 15 +++
.../yarn/server/resourcemanager/RMContext.java | 5 +
.../server/resourcemanager/RMContextImpl.java | 14 ++-
.../server/resourcemanager/ResourceManager.java | 35 +++++--
.../metrics/SystemMetricsPublisher.java | 29 +++---
.../timelineservice/RMTimelineCollector.java | 104 +++++++++++++++++++
10 files changed, 205 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc12cad2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 259cf64..111de71 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -38,6 +38,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3377. Fixed test failure in TestTimelineServiceClientIntegration.
(Sangjin Lee via zjshen)
+ YARN-3034. Implement RM starting its timeline collector. (Naganarasimha G R
+ via junping_du)
+
IMPROVEMENTS
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc12cad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 73f11b8..13cdcbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -351,12 +351,20 @@ public class YarnConfiguration extends Configuration {
/**
* The setting that controls whether yarn system metrics is published on the
- * timeline server or not by RM.
+ * timeline server or not by RM. This configuration setting is for ATS V1
*/
- public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED =
- RM_PREFIX + "system-metrics-publisher.enabled";
+ public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX
+ + "system-metrics-publisher.enabled";
public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
+ /**
+ * The setting that controls whether yarn system metrics is published on the
+ * timeline server or not by RM and NM. This configuration setting is for ATS V2
+ */
+ public static final String SYSTEM_METRICS_PUBLISHER_ENABLED = YARN_PREFIX
+ + "system-metrics-publisher.enabled";
+ public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
+
public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc12cad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 9ac54ce..c4887b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -654,12 +654,21 @@
<property>
<description>The setting that controls whether yarn system metrics is
- published on the timeline server or not by RM.</description>
+ published to the Timeline server (version one) or not, by RM.
+ This configuration is deprecated.</description>
<name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
<value>false</value>
</property>
<property>
+ <description>The setting that controls whether yarn system metrics is
+ published on the Timeline server (version two) or not by RM And NM.</description>
+ <name>yarn.system-metrics-publisher.enabled</name>
+ <value>false</value>
+ </property>
+
+
+ <property>
<description>Number of worker threads that send the yarn system metrics
data.</description>
<name>yarn.resourcemanager.system-metrics-publisher.dispatcher.pool-size</name>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc12cad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index aaa0de5..1aeda1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -171,6 +171,10 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc12cad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 03fc40e..cbb0a8b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -94,6 +95,8 @@ public class RMActiveServiceContext {
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
+ private RMTimelineCollector timelineCollector;
+
private RMNodeLabelsManager nodeLabelManager;
private long epoch;
private Clock systemClock = new SystemClock();
@@ -376,6 +379,18 @@ public class RMActiveServiceContext {
@Private
@Unstable
+ public RMTimelineCollector getRMTimelineCollector() {
+ return timelineCollector;
+ }
+
+ @Private
+ @Unstable
+ public void setRMTimelineCollector(RMTimelineCollector timelineCollector) {
+ this.timelineCollector = timelineCollector;
+ }
+
+ @Private
+ @Unstable
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc12cad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index ecf6166..b96601c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
/**
* Context of the ResourceManager.
@@ -108,6 +109,10 @@ public interface RMContext {
void setSystemMetricsPublisher(SystemMetricsPublisher systemMetricsPublisher);
SystemMetricsPublisher getSystemMetricsPublisher();
+
+ void setRMTimelineCollector(RMTimelineCollector timelineCollector);
+
+ RMTimelineCollector getRMTimelineCollector();
ConfigurationProvider getConfigurationProvider();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc12cad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 1d0d6c0..531d4c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -22,8 +22,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
@@ -354,6 +355,17 @@ public class RMContextImpl implements RMContext {
}
@Override
+ public void setRMTimelineCollector(
+ RMTimelineCollector timelineCollector) {
+ activeServiceContext.setRMTimelineCollector(timelineCollector);
+ }
+
+ @Override
+ public RMTimelineCollector getRMTimelineCollector() {
+ return activeServiceContext.getRMTimelineCollector();
+ }
+
+ @Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc12cad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8bd8e21..b993ede 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -18,7 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -80,11 +88,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
@@ -98,14 +111,7 @@ import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.annotations.VisibleForTesting;
/**
* The ResourceManager is the main class that is a set of components.
@@ -350,6 +356,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new RMApplicationHistoryWriter();
}
+ private RMTimelineCollector createRMTimelineCollector() {
+ return new RMTimelineCollector();
+ }
+
protected SystemMetricsPublisher createSystemMetricsPublisher() {
return new SystemMetricsPublisher();
}
@@ -472,6 +482,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
addService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
+ RMTimelineCollector timelineCollector =
+ createRMTimelineCollector();
+ addService(timelineCollector);
+ rmContext.setRMTimelineCollector(timelineCollector);
+
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc12cad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
index b849b00..2828aec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
- * The class that helps RM publish metrics to the timeline server. RM will
+ * The class that helps RM publish metrics to the timeline server V1. RM will
* always invoke the methods of this class regardless the service is enabled or
* not. If it is disabled, publishing requests will be ignored silently.
*/
@@ -66,7 +66,7 @@ public class SystemMetricsPublisher extends CompositeService {
private Dispatcher dispatcher;
private TimelineClient client;
- private boolean publishSystemMetrics;
+ private boolean publishSystemMetricsToATSv1;
public SystemMetricsPublisher() {
super(SystemMetricsPublisher.class.getName());
@@ -74,13 +74,14 @@ public class SystemMetricsPublisher extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
- publishSystemMetrics =
+ publishSystemMetricsToATSv1 =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) &&
- conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
- YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED);
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+ && conf.getBoolean(
+ YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+ YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
- if (publishSystemMetrics) {
+ if (publishSystemMetricsToATSv1) {
client = TimelineClient.createTimelineClient();
addIfService(client);
@@ -97,7 +98,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appCreated(RMApp app, long createdTime) {
- if (publishSystemMetrics) {
+ if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new ApplicationCreatedEvent(
app.getApplicationId(),
@@ -112,7 +113,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
- if (publishSystemMetrics) {
+ if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new ApplicationFinishedEvent(
app.getApplicationId(),
@@ -129,7 +130,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appACLsUpdated(RMApp app, String appViewACLs,
long updatedTime) {
- if (publishSystemMetrics) {
+ if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new ApplicationACLsUpdatedEvent(
app.getApplicationId(),
@@ -141,7 +142,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
- if (publishSystemMetrics) {
+ if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new AppAttemptRegisteredEvent(
appAttempt.getAppAttemptId(),
@@ -157,7 +158,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
- if (publishSystemMetrics) {
+ if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new AppAttemptFinishedEvent(
appAttempt.getAppAttemptId(),
@@ -174,7 +175,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void containerCreated(RMContainer container, long createdTime) {
- if (publishSystemMetrics) {
+ if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new ContainerCreatedEvent(
container.getContainerId(),
@@ -187,7 +188,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void containerFinished(RMContainer container, long finishedTime) {
- if (publishSystemMetrics) {
+ if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new ContainerFinishedEvent(
container.getContainerId(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc12cad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
new file mode 100644
index 0000000..22743d6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+
+/**
+ * This class is responsible for posting application and appattempt lifecycle
+ * related events to timeline service V2
+ */
+@Private
+@Unstable
+public class RMTimelineCollector extends TimelineCollector {
+ private static final Log LOG = LogFactory.getLog(RMTimelineCollector.class);
+
+ public RMTimelineCollector() {
+ super("Resource Manager TimelineCollector");
+ }
+
+ private Dispatcher dispatcher;
+
+ private boolean publishSystemMetricsForV2;
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ publishSystemMetricsForV2 =
+ conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+ && conf.getBoolean(
+ YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+ YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
+
+ if (publishSystemMetricsForV2) {
+ // having separate dispatcher to avoid load on RMDispatcher
+ LOG.info("RMTimelineCollector has been configured to publish"
+ + " System Metrics in ATS V2");
+ dispatcher = new AsyncDispatcher();
+ dispatcher.register(SystemMetricsEventType.class,
+ new ForwardingEventHandler());
+ } else {
+ LOG.warn("RMTimelineCollector has not been configured to publish"
+ + " System Metrics in ATS V2");
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ protected void handleSystemMetricsEvent(SystemMetricsEvent event) {
+ switch (event.getType()) {
+ default:
+ LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
+ }
+ }
+
+ /**
+ * EventHandler implementation which forward events to SystemMetricsPublisher.
+ * Making use of it, SystemMetricsPublisher can avoid to have a public handle
+ * method.
+ */
+ private final class ForwardingEventHandler implements
+ EventHandler<SystemMetricsEvent> {
+
+ @Override
+ public void handle(SystemMetricsEvent event) {
+ handleSystemMetricsEvent(event);
+ }
+ }
+}