You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/06/13 20:33:18 UTC
[1/2] hadoop git commit: YARN-3044. Made RM write app,
attempt and optional container lifecycle events to timeline service
v2. Contributed by Naganarasimha G R.
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 0a3c14782 -> 17842a3f6
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
new file mode 100644
index 0000000..9830a80
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher.MultiThreadedDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSystemMetricsPublisherForV2 {
+
+ /**
+ * is the folder where the FileSystemTimelineWriterImpl writes the entities
+ */
+ protected static File testRootDir = new File("target",
+ TestSystemMetricsPublisherForV2.class.getName() + "-localDir")
+ .getAbsoluteFile();
+
+ private static SystemMetricsPublisher metricsPublisher;
+ private static DrainDispatcher dispatcher = new DrainDispatcher();
+ private static final String DEFAULT_FLOW_VERSION = "1";
+ private static final long DEFAULT_FLOW_RUN = 1;
+
+ private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
+
+ private static RMTimelineCollectorManager rmTimelineCollectorManager;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ if (testRootDir.exists()) {
+ //cleanup before hand
+ FileContext.getLocalFSFileContext().delete(
+ new Path(testRootDir.getAbsolutePath()), true);
+ }
+
+ RMContext rmContext = mock(RMContext.class);
+ rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
+ when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
+ rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext);
+ when(rmContext.getRMTimelineCollectorManager()).thenReturn(
+ rmTimelineCollectorManager);
+
+ Configuration conf = getTimelineV2Conf();
+ rmTimelineCollectorManager.init(conf);
+ rmTimelineCollectorManager.start();
+
+ metricsPublisher = new SystemMetricsPublisher(rmContext) {
+ @Override
+ Dispatcher createDispatcher(
+ TimelineServicePublisher timelineServicePublisher) {
+ return dispatcher;
+ }
+ };
+ metricsPublisher.init(conf);
+ metricsPublisher.start();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (testRootDir.exists()) {
+ FileContext.getLocalFSFileContext().delete(
+ new Path(testRootDir.getAbsolutePath()), true);
+ }
+ if (rmTimelineCollectorManager != null) {
+ rmTimelineCollectorManager.stop();
+ }
+ if (metricsPublisher != null) {
+ metricsPublisher.stop();
+ }
+ }
+
+ private static Configuration getTimelineV2Conf() {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+ conf.setInt(
+ YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2);
+ conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
+ true);
+ try {
+ conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ testRootDir.getCanonicalPath());
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert
+ .fail("Exception while setting the TIMELINE_SERVICE_STORAGE_DIR_ROOT ");
+ }
+ return conf;
+ }
+
+ @Test
+ public void testSystemMetricPublisherInitialization() {
+ @SuppressWarnings("resource")
+ SystemMetricsPublisher metricsPublisher =
+ new SystemMetricsPublisher(mock(RMContext.class));
+ try {
+ Configuration conf = getTimelineV2Conf();
+ conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
+ false);
+ metricsPublisher.init(conf);
+ assertFalse(
+ "Default configuration should not publish container Metrics from RM",
+ metricsPublisher.isPublishContainerMetrics());
+
+ metricsPublisher.stop();
+
+ metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
+ conf = getTimelineV2Conf();
+ metricsPublisher.init(conf);
+ assertTrue("Expected to publish container Metrics from RM",
+ metricsPublisher.isPublishContainerMetrics());
+ assertTrue(
+ "MultiThreadedDispatcher expected when container Metrics is not published",
+ metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher);
+ } finally {
+ metricsPublisher.stop();
+ }
+ }
+
+ @Test(timeout = 1000000)
+ public void testPublishApplicationMetrics() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ RMApp app = createAppAndRegister(appId);
+
+ metricsPublisher.appCreated(app, app.getStartTime());
+ metricsPublisher.appACLsUpdated(app, "user1,user2", 4L);
+ metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
+ dispatcher.await();
+
+ String outputDirApp =
+ getTimelineEntityDir(app) + "/" + TimelineEntityType.YARN_APPLICATION
+ + "/";
+
+ File entityFolder = new File(outputDirApp);
+ Assert.assertTrue(entityFolder.isDirectory());
+
+ // file name is <entityId>.thist
+ String timelineServiceFileName =
+ appId.toString()
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ File appFile = new File(outputDirApp, timelineServiceFileName);
+ Assert.assertTrue(appFile.exists());
+ Assert.assertEquals("Expected 3 events to be published", 3,
+ getNumOfNonEmptyLines(appFile));
+ }
+
+ @Test(timeout = 10000)
+ public void testPublishAppAttemptMetrics() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ RMApp app = rmAppsMapInContext.get(appId);
+ if (app == null) {
+ app = createAppAndRegister(appId);
+ }
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
+ metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
+ when(app.getFinalApplicationStatus()).thenReturn(
+ FinalApplicationStatus.UNDEFINED);
+ metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
+ app, Integer.MAX_VALUE + 2L);
+
+ dispatcher.await();
+
+ String outputDirApp =
+ getTimelineEntityDir(app) + "/"
+ + TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/";
+
+ File entityFolder = new File(outputDirApp);
+ Assert.assertTrue(entityFolder.isDirectory());
+
+ // file name is <entityId>.thist
+ String timelineServiceFileName =
+ appAttemptId.toString()
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ File appFile = new File(outputDirApp, timelineServiceFileName);
+ Assert.assertTrue(appFile.exists());
+ Assert.assertEquals("Expected 2 events to be published", 2,
+ getNumOfNonEmptyLines(appFile));
+ }
+
+ @Test(timeout = 10000)
+ public void testPublishContainerMetrics() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ RMApp app = rmAppsMapInContext.get(appId);
+ if (app == null) {
+ app = createAppAndRegister(appId);
+ }
+ ContainerId containerId =
+ ContainerId.newContainerId(ApplicationAttemptId.newInstance(
+ appId, 1), 1);
+ RMContainer container = createRMContainer(containerId);
+ metricsPublisher.containerCreated(container, container.getCreationTime());
+ metricsPublisher.containerFinished(container, container.getFinishTime());
+ dispatcher.await();
+
+ String outputDirApp =
+ getTimelineEntityDir(app) + "/"
+ + TimelineEntityType.YARN_CONTAINER + "/";
+
+ File entityFolder = new File(outputDirApp);
+ Assert.assertTrue(entityFolder.isDirectory());
+
+ // file name is <entityId>.thist
+ String timelineServiceFileName =
+ containerId.toString()
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ File appFile = new File(outputDirApp, timelineServiceFileName);
+ Assert.assertTrue(appFile.exists());
+ Assert.assertEquals("Expected 2 events to be published", 2,
+ getNumOfNonEmptyLines(appFile));
+ }
+
+ private RMApp createAppAndRegister(ApplicationId appId) {
+ RMApp app = createRMApp(appId);
+
+ // some stuff which are currently taken care in RMAppImpl
+ rmAppsMapInContext.putIfAbsent(appId, app);
+ AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId);
+ rmTimelineCollectorManager.putIfAbsent(appId, collector);
+ return app;
+ }
+
+ private long getNumOfNonEmptyLines(File entityFile) throws IOException {
+ BufferedReader reader = null;
+ String strLine;
+ long count = 0;
+ try {
+ reader = new BufferedReader(new FileReader(entityFile));
+ while ((strLine = reader.readLine()) != null) {
+ if (strLine.trim().length() > 0)
+ count++;
+ }
+ } finally {
+ reader.close();
+ }
+ return count;
+ }
+
+ private String getTimelineEntityDir(RMApp app) {
+ String outputDirApp =
+ testRootDir.getAbsolutePath()+"/"
+ + FileSystemTimelineWriterImpl.ENTITIES_DIR
+ + "/"
+ + YarnConfiguration.DEFAULT_RM_CLUSTER_ID
+ + "/"
+ + app.getUser()
+ + "/"
+ + TimelineUtils.generateDefaultFlowIdBasedOnAppId(app
+ .getApplicationId()) + "/" + DEFAULT_FLOW_VERSION + "/"
+ + DEFAULT_FLOW_RUN + "/" + app.getApplicationId();
+ return outputDirApp;
+ }
+
+ private static RMApp createRMApp(ApplicationId appId) {
+ RMApp app = mock(RMAppImpl.class);
+ when(app.getApplicationId()).thenReturn(appId);
+ when(app.getName()).thenReturn("test app");
+ when(app.getApplicationType()).thenReturn("test app type");
+ when(app.getUser()).thenReturn("testUser");
+ when(app.getQueue()).thenReturn("test queue");
+ when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
+ when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
+ when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
+ when(app.getDiagnostics()).thenReturn(
+ new StringBuilder("test diagnostics info"));
+ RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+ when(appAttempt.getAppAttemptId()).thenReturn(
+ ApplicationAttemptId.newInstance(appId, 1));
+ when(app.getCurrentAppAttempt()).thenReturn(appAttempt);
+ when(app.getFinalApplicationStatus()).thenReturn(
+ FinalApplicationStatus.UNDEFINED);
+ when(app.getRMAppMetrics()).thenReturn(
+ new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE));
+ when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet());
+ return app;
+ }
+
+ private static RMAppAttempt createRMAppAttempt(
+ ApplicationAttemptId appAttemptId) {
+ RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+ when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
+ when(appAttempt.getHost()).thenReturn("test host");
+ when(appAttempt.getRpcPort()).thenReturn(-100);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(
+ ContainerId.newContainerId(appAttemptId, 1));
+ when(appAttempt.getMasterContainer()).thenReturn(container);
+ when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
+ when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
+ when(appAttempt.getOriginalTrackingUrl()).thenReturn(
+ "test original tracking url");
+ return appAttempt;
+ }
+
+ private static RMContainer createRMContainer(ContainerId containerId) {
+ RMContainer container = mock(RMContainer.class);
+ when(container.getContainerId()).thenReturn(containerId);
+ when(container.getAllocatedNode()).thenReturn(
+ NodeId.newInstance("test host", -100));
+ when(container.getAllocatedResource()).thenReturn(
+ Resource.newInstance(-1, -1));
+ when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
+ when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L);
+ when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L);
+ when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
+ when(container.getContainerExitStatus()).thenReturn(-1);
+ when(container.getContainerState()).thenReturn(ContainerState.COMPLETE);
+ Container mockContainer = mock(Container.class);
+ when(container.getContainer()).thenReturn(mockContainer);
+ when(mockContainer.getNodeHttpAddress())
+ .thenReturn("http://localhost:1234");
+ return container;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 1fc7651..2fff98d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -55,7 +55,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
= "/tmp/timeline_service_data";
- private static final String ENTITIES_DIR = "entities";
+ public static final String ENTITIES_DIR = "entities";
/** Default extension for output files. */
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
@@ -76,7 +76,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
return response;
}
- private void write(String clusterId, String userId, String flowName,
+ private synchronized void write(String clusterId, String userId, String flowName,
String flowVersion, long flowRun, String appId, TimelineEntity entity,
TimelineWriteResponse response) throws IOException {
PrintWriter out = null;
[2/2] hadoop git commit: YARN-3044. Made RM write app,
attempt and optional container lifecycle events to timeline service
v2. Contributed by Naganarasimha G R.
Posted by zj...@apache.org.
YARN-3044. Made RM write app, attempt and optional container lifecycle events to timeline service v2. Contributed by Naganarasimha G R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/17842a3f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/17842a3f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/17842a3f
Branch: refs/heads/YARN-2928
Commit: 17842a3f61ed33ec831a889c11a74d4814be73ec
Parents: 0a3c147
Author: Zhijie Shen <zj...@apache.org>
Authored: Sat Jun 13 11:32:41 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Sat Jun 13 11:32:41 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../records/timelineservice/TimelineEntity.java | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 10 +
.../distributedshell/TestDistributedShell.java | 118 +++--
.../metrics/ContainerMetricsConstants.java | 10 +
.../server/resourcemanager/ResourceManager.java | 2 +-
.../AbstractTimelineServicePublisher.java | 179 ++++++++
.../metrics/ApplicationFinishedEvent.java | 13 +-
.../metrics/SystemMetricsPublisher.java | 426 ++++---------------
.../metrics/TimelineServiceV1Publisher.java | 265 ++++++++++++
.../metrics/TimelineServiceV2Publisher.java | 296 +++++++++++++
.../server/resourcemanager/rmapp/RMAppImpl.java | 2 -
.../TestRMAppLogAggregationStatus.java | 2 +-
.../metrics/TestSystemMetricsPublisher.java | 8 +-
.../TestSystemMetricsPublisherForV2.java | 374 ++++++++++++++++
.../storage/FileSystemTimelineWriterImpl.java | 4 +-
16 files changed, 1321 insertions(+), 394 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ab0dcb9..7b4b5d7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -82,6 +82,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3721. build is broken on YARN-2928 branch due to possible dependency
cycle (Li Lu via sjlee)
+ YARN-3044. Made RM write app, attempt and optional container lifecycle
+ events to timeline service v2. (Naganarasimha G R via zjshen)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
index defadec..a641f32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -470,4 +470,7 @@ public class TimelineEntity {
return real == null ? this : real;
}
+ public String toString() {
+ return identifier.toString();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index e4ae2b7..2f7892c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -378,6 +378,16 @@ public class YarnConfiguration extends Configuration {
+ "system-metrics-publisher.enabled";
public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
+ /**
+ * The setting that controls whether yarn container metrics is published to
+ * the timeline server or not by RM. This configuration setting is for ATS
+ * V2
+ */
+ public static final String RM_PUBLISH_CONTAINER_METRICS_ENABLED = YARN_PREFIX
+ + "rm.system-metrics-publisher.emit-container-events";
+ public static final boolean DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED =
+ false;
+
public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index b887c77..7883f1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -52,13 +52,14 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -107,6 +108,7 @@ public class TestDistributedShell {
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
+ ".class", PerNodeTimelineCollectorsAuxService.class.getName());
}
+ conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
// Enable ContainersMonitorImpl
@@ -373,48 +375,96 @@ public class TestDistributedShell {
"/1/1/" : "/test_flow_name/test_flow_version/12345678/") +
appId.toString();
// for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
- String outputDirApp = basePath + "/DS_APP_ATTEMPT/";
-
- File entityFolder = new File(outputDirApp);
- Assert.assertTrue(entityFolder.isDirectory());
+ // Verify DS_APP_ATTEMPT entities posted by the client
// there will be at least one attempt, look for that file
- String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
- + "_000" + appId.getId() + "_000001"
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- String appAttemptFileName = outputDirApp + appTimestampFileName;
- File appAttemptFile = new File(appAttemptFileName);
- Assert.assertTrue(appAttemptFile.exists());
-
- String outputDirContainer = basePath + "/DS_CONTAINER/";
- File containerFolder = new File(outputDirContainer);
- Assert.assertTrue(containerFolder.isDirectory());
-
- String containerTimestampFileName = "container_"
- + appId.getClusterTimestamp() + "_000" + appId.getId()
- + "_01_000002.thist";
- String containerFileName = outputDirContainer + containerTimestampFileName;
- File containerFile = new File(containerFileName);
- Assert.assertTrue(containerFile.exists());
+ String appTimestampFileName =
+ "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ + "_000001"
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
+ appTimestampFileName);
+
+ // Verify DS_CONTAINER entities posted by the client
+ String containerTimestampFileName =
+ "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ + "_01_000002.thist";
+ verifyEntityTypeFileExists(basePath, "DS_CONTAINER",
+ containerTimestampFileName);
// Verify NM posting container metrics info.
- String outputDirContainerMetrics = basePath + "/" +
- TimelineEntityType.YARN_CONTAINER + "/";
- File containerMetricsFolder = new File(outputDirContainerMetrics);
- Assert.assertTrue(containerMetricsFolder.isDirectory());
+ String containerMetricsTimestampFileName =
+ "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ + "_01_000001"
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ verifyEntityTypeFileExists(basePath,
+ TimelineEntityType.YARN_CONTAINER.toString(),
+ containerMetricsTimestampFileName);
+
+ // Verify RM posting Application life cycle Events are getting published
+ String appMetricsTimestampFileName =
+ "application_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ File appEntityFile =
+ verifyEntityTypeFileExists(basePath,
+ TimelineEntityType.YARN_APPLICATION.toString(),
+ appMetricsTimestampFileName);
+ verifyStringExistsSpecifiedTimes(appEntityFile,
+ ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1,
+ "Application created event should be published atleast once");
+ verifyStringExistsSpecifiedTimes(appEntityFile,
+ ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1,
+ "Application finished event should be published atleast once");
+
+ // Verify RM posting AppAttempt life cycle Events are getting published
+ String appAttemptMetricsTimestampFileName =
+ "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ + "_000001"
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ File appAttemptEntityFile =
+ verifyEntityTypeFileExists(basePath,
+ TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
+ appAttemptMetricsTimestampFileName);
+ verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
+ AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1,
+ "AppAttempt register event should be published atleast once");
+ verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
+ AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1,
+ "AppAttempt finished event should be published atleast once");
+ } finally {
+ FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+ }
+ }
- String containerMetricsTimestampFileName = "container_"
- + appId.getClusterTimestamp() + "_000" + appId.getId()
- + "_01_000001.thist";
- String containerMetricsFileName = outputDirContainerMetrics +
- containerMetricsTimestampFileName;
+ private File verifyEntityTypeFileExists(String basePath, String entityType,
+ String entityfileName) {
+ String outputDirPathForEntity = basePath + "/" + entityType + "/";
+ File outputDirForEntity = new File(outputDirPathForEntity);
+ Assert.assertTrue(outputDirForEntity.isDirectory());
- File containerMetricsFile = new File(containerMetricsFileName);
- Assert.assertTrue(containerMetricsFile.exists());
+ String entityFilePath = outputDirPathForEntity + entityfileName;
+ File entityFile = new File(entityFilePath);
+ Assert.assertTrue(entityFile.exists());
+ return entityFile;
+ }
+
+ private void verifyStringExistsSpecifiedTimes(File entityFile,
+ String searchString, long expectedNumOfTimes, String errorMsg)
+ throws IOException {
+ BufferedReader reader = null;
+ String strLine;
+ long actualCount = 0;
+ try {
+ reader = new BufferedReader(new FileReader(entityFile));
+ while ((strLine = reader.readLine()) != null) {
+ if (strLine.trim().contains(searchString))
+ actualCount++;
+ }
} finally {
- FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+ reader.close();
}
+ Assert.assertEquals(errorMsg, expectedNumOfTimes, actualCount);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
index 0d5540d..7b42994 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
@@ -27,10 +27,20 @@ public class ContainerMetricsConstants {
public static final String ENTITY_TYPE = "YARN_CONTAINER";
+ // Event of this type will be emitted by NM.
public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED";
+ // Event of this type will be emitted by RM.
+ public static final String CREATED_IN_RM_EVENT_TYPE =
+ "YARN_RM_CONTAINER_CREATED";
+
+ // Event of this type will be emitted by NM.
public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED";
+ // Event of this type will be emitted by RM.
+ public static final String FINISHED_IN_RM_EVENT_TYPE =
+ "YARN_RM_CONTAINER_FINISHED";
+
public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT";
public static final String ALLOCATED_MEMORY_ENTITY_INFO =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 0feb227..43c2021 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -360,7 +360,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected SystemMetricsPublisher createSystemMetricsPublisher() {
- return new SystemMetricsPublisher();
+ return new SystemMetricsPublisher(rmContext);
}
// sanity check for configurations
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
new file mode 100644
index 0000000..0d66327
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher;
+
+public abstract class AbstractTimelineServicePublisher extends CompositeService
+ implements TimelineServicePublisher, EventHandler<SystemMetricsEvent> {
+
+ private static final Log LOG = LogFactory
+ .getLog(TimelineServiceV2Publisher.class);
+
+ private Configuration conf;
+
+ public AbstractTimelineServicePublisher(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.conf = conf;
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ @Override
+ public void handle(SystemMetricsEvent event) {
+ switch (event.getType()) {
+ case APP_CREATED:
+ publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
+ break;
+ case APP_FINISHED:
+ publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
+ break;
+ case APP_ACLS_UPDATED:
+ publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
+ break;
+ case APP_ATTEMPT_REGISTERED:
+ publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
+ break;
+ case APP_ATTEMPT_FINISHED:
+ publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
+ break;
+ case CONTAINER_CREATED:
+ publishContainerCreatedEvent((ContainerCreatedEvent) event);
+ break;
+ case CONTAINER_FINISHED:
+ publishContainerFinishedEvent((ContainerFinishedEvent) event);
+ break;
+ default:
+ LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
+ }
+ }
+
+ abstract void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event);
+
+ abstract void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event);
+
+ abstract void publishApplicationACLsUpdatedEvent(
+ ApplicationACLsUpdatedEvent event);
+
+ abstract void publishApplicationFinishedEvent(ApplicationFinishedEvent event);
+
+ abstract void publishApplicationCreatedEvent(ApplicationCreatedEvent event);
+
+ abstract void publishContainerCreatedEvent(ContainerCreatedEvent event);
+
+ abstract void publishContainerFinishedEvent(ContainerFinishedEvent event);
+
+ @Override
+ public Dispatcher getDispatcher() {
+ MultiThreadedDispatcher dispatcher =
+ new MultiThreadedDispatcher(
+ conf.getInt(
+ YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+ YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
+ dispatcher.setDrainEventsOnStop();
+ return dispatcher;
+ }
+
+ @Override
+ public boolean publishRMContainerMetrics() {
+ return true;
+ }
+
+ @Override
+ public EventHandler<SystemMetricsEvent> getEventHandler() {
+ return this;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static class MultiThreadedDispatcher extends CompositeService
+ implements Dispatcher {
+
+ private List<AsyncDispatcher> dispatchers =
+ new ArrayList<AsyncDispatcher>();
+
+ public MultiThreadedDispatcher(int num) {
+ super(MultiThreadedDispatcher.class.getName());
+ for (int i = 0; i < num; ++i) {
+ AsyncDispatcher dispatcher = createDispatcher();
+ dispatchers.add(dispatcher);
+ addIfService(dispatcher);
+ }
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return new CompositEventHandler();
+ }
+
+ @Override
+ public void register(Class<? extends Enum> eventType, EventHandler handler) {
+ for (AsyncDispatcher dispatcher : dispatchers) {
+ dispatcher.register(eventType, handler);
+ }
+ }
+
+ public void setDrainEventsOnStop() {
+ for (AsyncDispatcher dispatcher : dispatchers) {
+ dispatcher.setDrainEventsOnStop();
+ }
+ }
+
+ private class CompositEventHandler implements EventHandler<Event> {
+
+ @Override
+ public void handle(Event event) {
+ // Use hashCode (of ApplicationId) to dispatch the event to the child
+ // dispatcher, such that all the writing events of one application will
+ // be handled by one thread, the scheduled order of the these events
+ // will be preserved
+ int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
+ dispatchers.get(index).getEventHandler().handle(event);
+ }
+ }
+
+ protected AsyncDispatcher createDispatcher() {
+ return new AsyncDispatcher();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
index 8d75f92..d9241b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
public class ApplicationFinishedEvent extends
@@ -33,6 +35,7 @@ public class ApplicationFinishedEvent extends
private YarnApplicationState state;
private ApplicationAttemptId latestAppAttemptId;
private RMAppMetrics appMetrics;
+ private RMAppImpl app;
public ApplicationFinishedEvent(
ApplicationId appId,
@@ -41,14 +44,16 @@ public class ApplicationFinishedEvent extends
YarnApplicationState state,
ApplicationAttemptId latestAppAttemptId,
long finishedTime,
- RMAppMetrics appMetrics) {
+ RMAppMetrics appMetrics,
+ RMAppImpl app) {
super(SystemMetricsEventType.APP_FINISHED, finishedTime);
this.appId = appId;
this.diagnosticsInfo = diagnosticsInfo;
this.appStatus = appStatus;
this.latestAppAttemptId = latestAppAttemptId;
this.state = state;
- this.appMetrics=appMetrics;
+ this.appMetrics = appMetrics;
+ this.app = app;
}
@Override
@@ -56,6 +61,10 @@ public class ApplicationFinishedEvent extends
return appId.hashCode();
}
+ public RMAppImpl getApp() {
+ return app;
+ }
+
public ApplicationId getApplicationId() {
return appId;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
index 63461b5..629636b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
@@ -18,10 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,31 +25,23 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+import com.google.common.annotations.VisibleForTesting;
/**
- * The class that helps RM publish metrics to the timeline server V1. RM will
+ * The class that helps RM publish metrics to the timeline server. RM will
* always invoke the methods of this class regardless the service is enabled or
* not. If it is disabled, publishing requests will be ignored silently.
*/
@@ -65,30 +53,38 @@ public class SystemMetricsPublisher extends CompositeService {
.getLog(SystemMetricsPublisher.class);
private Dispatcher dispatcher;
- private TimelineClient client;
- private boolean publishSystemMetricsToATSv1;
+ private boolean publishSystemMetrics;
+ private boolean publishContainerMetrics;
+ protected RMContext rmContext;
- public SystemMetricsPublisher() {
+ public SystemMetricsPublisher(RMContext rmContext) {
super(SystemMetricsPublisher.class.getName());
+ this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
- publishSystemMetricsToATSv1 =
+ publishSystemMetrics =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
- && conf.getBoolean(
- YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
- YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
-
- if (publishSystemMetricsToATSv1) {
- client = TimelineClient.createTimelineClient();
- addIfService(client);
-
- dispatcher = createDispatcher(conf);
- dispatcher.register(SystemMetricsEventType.class,
- new ForwardingEventHandler());
- addIfService(dispatcher);
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+ if (publishSystemMetrics) {
+ TimelineServicePublisher timelineServicePublisher =
+ getTimelineServicePublisher(conf);
+ if (timelineServicePublisher != null) {
+ addService(timelineServicePublisher);
+ // init required to be called so that other methods of
+ // TimelineServicePublisher can be utilized
+ timelineServicePublisher.init(conf);
+ dispatcher = createDispatcher(timelineServicePublisher);
+ publishContainerMetrics =
+ timelineServicePublisher.publishRMContainerMetrics();
+ dispatcher.register(SystemMetricsEventType.class,
+ timelineServicePublisher.getEventHandler());
+ addIfService(dispatcher);
+ } else {
+ LOG.info("TimelineServicePublisher is not configured");
+ publishSystemMetrics = false;
+ }
LOG.info("YARN system metrics publishing service is enabled");
} else {
LOG.info("YARN system metrics publishing service is not enabled");
@@ -96,9 +92,26 @@ public class SystemMetricsPublisher extends CompositeService {
super.serviceInit(conf);
}
+ @VisibleForTesting
+ Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) {
+ return timelineServicePublisher.getDispatcher();
+ }
+
+ TimelineServicePublisher getTimelineServicePublisher(Configuration conf) {
+ if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+ YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
+ return new TimelineServiceV1Publisher();
+ } else if (conf.getBoolean(
+ YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+ YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
+ return new TimelineServiceV2Publisher(rmContext);
+ }
+ return null;
+ }
+
@SuppressWarnings("unchecked")
public void appCreated(RMApp app, long createdTime) {
- if (publishSystemMetricsToATSv1) {
+ if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationCreatedEvent(
app.getApplicationId(),
@@ -113,7 +126,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
- if (publishSystemMetricsToATSv1) {
+ if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationFinishedEvent(
app.getApplicationId(),
@@ -123,14 +136,15 @@ public class SystemMetricsPublisher extends CompositeService {
app.getCurrentAppAttempt() == null ?
null : app.getCurrentAppAttempt().getAppAttemptId(),
finishedTime,
- app.getRMAppMetrics()));
+ app.getRMAppMetrics(),
+ (RMAppImpl)app));
}
}
@SuppressWarnings("unchecked")
public void appACLsUpdated(RMApp app, String appViewACLs,
long updatedTime) {
- if (publishSystemMetricsToATSv1) {
+ if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationACLsUpdatedEvent(
app.getApplicationId(),
@@ -142,7 +156,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
- if (publishSystemMetricsToATSv1) {
+ if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new AppAttemptRegisteredEvent(
appAttempt.getAppAttemptId(),
@@ -158,7 +172,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
- if (publishSystemMetricsToATSv1) {
+ if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new AppAttemptFinishedEvent(
appAttempt.getAppAttemptId(),
@@ -175,7 +189,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void containerCreated(RMContainer container, long createdTime) {
- if (publishSystemMetricsToATSv1) {
+ if (publishContainerMetrics) {
dispatcher.getEventHandler().handle(
new ContainerCreatedEvent(
container.getContainerId(),
@@ -188,7 +202,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void containerFinished(RMContainer container, long finishedTime) {
- if (publishSystemMetricsToATSv1) {
+ if (publishContainerMetrics) {
dispatcher.getEventHandler().handle(
new ContainerFinishedEvent(
container.getContainerId(),
@@ -199,317 +213,31 @@ public class SystemMetricsPublisher extends CompositeService {
}
}
- protected Dispatcher createDispatcher(Configuration conf) {
- MultiThreadedDispatcher dispatcher =
- new MultiThreadedDispatcher(
- conf.getInt(
- YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
- YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
- dispatcher.setDrainEventsOnStop();
- return dispatcher;
- }
-
- protected void handleSystemMetricsEvent(
- SystemMetricsEvent event) {
- switch (event.getType()) {
- case APP_CREATED:
- publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
- break;
- case APP_FINISHED:
- publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
- break;
- case APP_ACLS_UPDATED:
- publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
- break;
- case APP_ATTEMPT_REGISTERED:
- publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
- break;
- case APP_ATTEMPT_FINISHED:
- publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
- break;
- case CONTAINER_CREATED:
- publishContainerCreatedEvent((ContainerCreatedEvent) event);
- break;
- case CONTAINER_FINISHED:
- publishContainerFinishedEvent((ContainerFinishedEvent) event);
- break;
- default:
- LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
- }
- }
-
- private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
- TimelineEntity entity =
- createApplicationEntity(event.getApplicationId());
- Map<String, Object> entityInfo = new HashMap<String, Object>();
- entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
- event.getApplicationName());
- entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
- event.getApplicationType());
- entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
- event.getUser());
- entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
- event.getQueue());
- entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
- event.getSubmittedTime());
- entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
- event.getAppTags());
- entity.setOtherInfo(entityInfo);
- TimelineEvent tEvent = new TimelineEvent();
- tEvent.setEventType(
- ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- tEvent.setTimestamp(event.getTimestamp());
- entity.addEvent(tEvent);
- putEntity(entity);
+ @VisibleForTesting
+ boolean isPublishContainerMetrics() {
+ return publishContainerMetrics;
}
- private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
- TimelineEntity entity =
- createApplicationEntity(event.getApplicationId());
- TimelineEvent tEvent = new TimelineEvent();
- tEvent.setEventType(
- ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- tEvent.setTimestamp(event.getTimestamp());
- Map<String, Object> eventInfo = new HashMap<String, Object>();
- eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
- event.getDiagnosticsInfo());
- eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
- event.getFinalApplicationStatus().toString());
- eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
- event.getYarnApplicationState().toString());
- if (event.getLatestApplicationAttemptId() != null) {
- eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
- event.getLatestApplicationAttemptId().toString());
- }
- RMAppMetrics appMetrics = event.getAppMetrics();
- entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
- appMetrics.getVcoreSeconds());
- entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
- appMetrics.getMemorySeconds());
-
- tEvent.setEventInfo(eventInfo);
- entity.addEvent(tEvent);
- putEntity(entity);
- }
-
- private void publishApplicationACLsUpdatedEvent(
- ApplicationACLsUpdatedEvent event) {
- TimelineEntity entity =
- createApplicationEntity(event.getApplicationId());
- TimelineEvent tEvent = new TimelineEvent();
- Map<String, Object> entityInfo = new HashMap<String, Object>();
- entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
- event.getViewAppACLs());
- entity.setOtherInfo(entityInfo);
- tEvent.setEventType(
- ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
- tEvent.setTimestamp(event.getTimestamp());
- entity.addEvent(tEvent);
- putEntity(entity);
- }
-
- private static TimelineEntity createApplicationEntity(
- ApplicationId applicationId) {
- TimelineEntity entity = new TimelineEntity();
- entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
- entity.setEntityId(applicationId.toString());
- return entity;
- }
-
- private void
- publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
- TimelineEntity entity =
- createAppAttemptEntity(event.getApplicationAttemptId());
- TimelineEvent tEvent = new TimelineEvent();
- tEvent.setEventType(
- AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
- tEvent.setTimestamp(event.getTimestamp());
- Map<String, Object> eventInfo = new HashMap<String, Object>();
- eventInfo.put(
- AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
- event.getTrackingUrl());
- eventInfo.put(
- AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
- event.getOriginalTrackingURL());
- eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
- event.getHost());
- eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
- event.getRpcPort());
- eventInfo.put(
- AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
- event.getMasterContainerId().toString());
- tEvent.setEventInfo(eventInfo);
- entity.addEvent(tEvent);
- putEntity(entity);
- }
-
- private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
- TimelineEntity entity =
- createAppAttemptEntity(event.getApplicationAttemptId());
- TimelineEvent tEvent = new TimelineEvent();
- tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
- tEvent.setTimestamp(event.getTimestamp());
- Map<String, Object> eventInfo = new HashMap<String, Object>();
- eventInfo.put(
- AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
- event.getTrackingUrl());
- eventInfo.put(
- AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
- event.getOriginalTrackingURL());
- eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
- event.getDiagnosticsInfo());
- eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
- event.getFinalApplicationStatus().toString());
- eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO,
- event.getYarnApplicationAttemptState().toString());
- tEvent.setEventInfo(eventInfo);
- entity.addEvent(tEvent);
- putEntity(entity);
- }
-
- private static TimelineEntity createAppAttemptEntity(
- ApplicationAttemptId appAttemptId) {
- TimelineEntity entity = new TimelineEntity();
- entity.setEntityType(
- AppAttemptMetricsConstants.ENTITY_TYPE);
- entity.setEntityId(appAttemptId.toString());
- entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
- appAttemptId.getApplicationId().toString());
- return entity;
- }
-
- private void publishContainerCreatedEvent(ContainerCreatedEvent event) {
- TimelineEntity entity = createContainerEntity(event.getContainerId());
- Map<String, Object> entityInfo = new HashMap<String, Object>();
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
- event.getAllocatedResource().getMemory());
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
- event.getAllocatedResource().getVirtualCores());
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
- event.getAllocatedNode().getHost());
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
- event.getAllocatedNode().getPort());
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
- event.getAllocatedPriority().getPriority());
- entityInfo.put(
- ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
- event.getNodeHttpAddress());
- entity.setOtherInfo(entityInfo);
- TimelineEvent tEvent = new TimelineEvent();
- tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
- tEvent.setTimestamp(event.getTimestamp());
- entity.addEvent(tEvent);
- putEntity(entity);
- }
-
- private void publishContainerFinishedEvent(ContainerFinishedEvent event) {
- TimelineEntity entity = createContainerEntity(event.getContainerId());
- TimelineEvent tEvent = new TimelineEvent();
- tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
- tEvent.setTimestamp(event.getTimestamp());
- Map<String, Object> eventInfo = new HashMap<String, Object>();
- eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
- event.getDiagnosticsInfo());
- eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
- event.getContainerExitStatus());
- eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
- event.getContainerState().toString());
- tEvent.setEventInfo(eventInfo);
- entity.addEvent(tEvent);
- putEntity(entity);
- }
-
- private static TimelineEntity createContainerEntity(
- ContainerId containerId) {
- TimelineEntity entity = new TimelineEntity();
- entity.setEntityType(
- ContainerMetricsConstants.ENTITY_TYPE);
- entity.setEntityId(containerId.toString());
- entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
- containerId.getApplicationAttemptId().toString());
- return entity;
- }
-
- private void putEntity(TimelineEntity entity) {
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Publishing the entity " + entity.getEntityId() +
- ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity));
- }
- client.putEntities(entity);
- } catch (Exception e) {
- LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
- + entity.getEntityId() + "]", e);
- }
+ @VisibleForTesting
+ Dispatcher getDispatcher() {
+ return dispatcher;
}
- /**
- * EventHandler implementation which forward events to SystemMetricsPublisher.
- * Making use of it, SystemMetricsPublisher can avoid to have a public handle
- * method.
- */
- private final class ForwardingEventHandler implements
- EventHandler<SystemMetricsEvent> {
+ interface TimelineServicePublisher extends Service {
+ /**
+ * @return the Dispatcher which needs to be used to dispatch events
+ */
+ Dispatcher getDispatcher();
- @Override
- public void handle(SystemMetricsEvent event) {
- handleSystemMetricsEvent(event);
- }
+ /**
+ * @return true if RMContainerMetricsNeeds to be sent
+ */
+ boolean publishRMContainerMetrics();
+ /**
+ * @return EventHandler which needs to be registered to the dispatcher to
+ * handle the SystemMetricsEvent
+ */
+ EventHandler<SystemMetricsEvent> getEventHandler();
}
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- protected static class MultiThreadedDispatcher extends CompositeService
- implements Dispatcher {
-
- private List<AsyncDispatcher> dispatchers =
- new ArrayList<AsyncDispatcher>();
-
- public MultiThreadedDispatcher(int num) {
- super(MultiThreadedDispatcher.class.getName());
- for (int i = 0; i < num; ++i) {
- AsyncDispatcher dispatcher = createDispatcher();
- dispatchers.add(dispatcher);
- addIfService(dispatcher);
- }
- }
-
- @Override
- public EventHandler getEventHandler() {
- return new CompositEventHandler();
- }
-
- @Override
- public void register(Class<? extends Enum> eventType, EventHandler handler) {
- for (AsyncDispatcher dispatcher : dispatchers) {
- dispatcher.register(eventType, handler);
- }
- }
-
- public void setDrainEventsOnStop() {
- for (AsyncDispatcher dispatcher : dispatchers) {
- dispatcher.setDrainEventsOnStop();
- }
- }
-
- private class CompositEventHandler implements EventHandler<Event> {
-
- @Override
- public void handle(Event event) {
- // Use hashCode (of ApplicationId) to dispatch the event to the child
- // dispatcher, such that all the writing events of one application will
- // be handled by one thread, the scheduled order of the these events
- // will be preserved
- int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
- dispatchers.get(index).getEventHandler().handle(event);
- }
-
- }
-
- protected AsyncDispatcher createDispatcher() {
- return new AsyncDispatcher();
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
new file mode 100644
index 0000000..c4a85b3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+public class TimelineServiceV1Publisher extends
+ AbstractTimelineServicePublisher {
+
+ private static final Log LOG = LogFactory
+ .getLog(TimelineServiceV1Publisher.class);
+
+ public TimelineServiceV1Publisher() {
+ super("TimelineserviceV1Publisher");
+ }
+
+ private TimelineClient client;
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ client = TimelineClient.createTimelineClient();
+ addIfService(client);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
+ TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
+ entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
+ event.getApplicationName());
+ entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
+ event.getApplicationType());
+ entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
+ event.getUser());
+ entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
+ event.getQueue());
+ entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
+ event.getSubmittedTime());
+ entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
+ event.getAppTags());
+ entity.setOtherInfo(entityInfo);
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+
+ entity.addEvent(tEvent);
+ putEntity(entity);
+ }
+
+ @Override
+ void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
+ TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
+ eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+ event.getDiagnosticsInfo());
+ eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+ .getFinalApplicationStatus().toString());
+ eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
+ .getYarnApplicationState().toString());
+ if (event.getLatestApplicationAttemptId() != null) {
+ eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
+ event.getLatestApplicationAttemptId().toString());
+ }
+ RMAppMetrics appMetrics = event.getAppMetrics();
+ entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
+ appMetrics.getVcoreSeconds());
+ entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
+ appMetrics.getMemorySeconds());
+ tEvent.setEventInfo(eventInfo);
+
+ entity.addEvent(tEvent);
+ putEntity(entity);
+ }
+
+ @Override
+ void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
+ TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
+ entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
+ event.getViewAppACLs());
+ entity.setOtherInfo(entityInfo);
+ tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+
+ entity.addEvent(tEvent);
+ putEntity(entity);
+ }
+
+ private static TimelineEntity createApplicationEntity(
+ ApplicationId applicationId) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
+ entity.setEntityId(applicationId.toString());
+ return entity;
+ }
+
+ @Override
+ void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+ TimelineEntity entity =
+ createAppAttemptEntity(event.getApplicationAttemptId());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
+ eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+ event.getTrackingUrl());
+ eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+ event.getOriginalTrackingURL());
+ eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost());
+ eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
+ event.getRpcPort());
+ eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event
+ .getMasterContainerId().toString());
+ tEvent.setEventInfo(eventInfo);
+
+ entity.addEvent(tEvent);
+ putEntity(entity);
+ }
+
+ @Override
+ void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+ TimelineEntity entity =
+ createAppAttemptEntity(event.getApplicationAttemptId());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
+ eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+ event.getTrackingUrl());
+ eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+ event.getOriginalTrackingURL());
+ eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+ event.getDiagnosticsInfo());
+ eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+ .getFinalApplicationStatus().toString());
+ eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
+ .getYarnApplicationAttemptState().toString());
+ tEvent.setEventInfo(eventInfo);
+
+ entity.addEvent(tEvent);
+ putEntity(entity);
+ }
+
+ private static TimelineEntity createAppAttemptEntity(
+ ApplicationAttemptId appAttemptId) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE);
+ entity.setEntityId(appAttemptId.toString());
+ entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
+ appAttemptId.getApplicationId().toString());
+ return entity;
+ }
+
+ @Override
+ void publishContainerCreatedEvent(ContainerCreatedEvent event) {
+ TimelineEntity entity = createContainerEntity(event.getContainerId());
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
+ event.getAllocatedResource().getMemory());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
+ .getAllocatedResource().getVirtualCores());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
+ .getAllocatedNode().getHost());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
+ .getAllocatedNode().getPort());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+ event.getAllocatedPriority().getPriority());
+ entityInfo.put(
+ ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+ event.getNodeHttpAddress());
+ entity.setOtherInfo(entityInfo);
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+
+ entity.addEvent(tEvent);
+ putEntity(entity);
+ }
+
+ @Override
+ void publishContainerFinishedEvent(ContainerFinishedEvent event) {
+ TimelineEntity entity = createContainerEntity(event.getContainerId());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
+ eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+ event.getDiagnosticsInfo());
+ eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+ event.getContainerExitStatus());
+ eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
+ .getContainerState().toString());
+ tEvent.setEventInfo(eventInfo);
+
+ entity.addEvent(tEvent);
+ putEntity(entity);
+ }
+
+ private static TimelineEntity createContainerEntity(ContainerId containerId) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE);
+ entity.setEntityId(containerId.toString());
+ entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
+ containerId.getApplicationAttemptId().toString());
+ return entity;
+ }
+
+ private void putEntity(TimelineEntity entity) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Publishing the entity " + entity.getEntityId()
+ + ", JSON-style content: "
+ + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ }
+ client.putEntities(entity);
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
+ + entity.getEntityId() + "]", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
new file mode 100644
index 0000000..81ce54c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * This class is responsible for posting application, appattempt & Container
+ * lifecycle related events to timeline service V2
+ */
+@Private
+@Unstable
+public class TimelineServiceV2Publisher extends
+ AbstractTimelineServicePublisher {
+ private static final Log LOG = LogFactory
+ .getLog(TimelineServiceV2Publisher.class);
+ protected RMTimelineCollectorManager rmTimelineCollectorManager;
+
+ public TimelineServiceV2Publisher(RMContext rmContext) {
+ super("TimelineserviceV2Publisher");
+ rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
+ }
+
+ private boolean publishContainerMetrics;
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ publishContainerMetrics =
+ conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
+ YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
+ ApplicationEntity entity =
+ createApplicationEntity(event.getApplicationId());
+ entity.setQueue(event.getQueue());
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
+ entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
+ event.getApplicationName());
+ entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
+ event.getApplicationType());
+ entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
+ event.getUser());
+ entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
+ event.getSubmittedTime());
+ entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
+ event.getAppTags());
+ entity.setInfo(entityInfo);
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+ entity.addEvent(tEvent);
+
+ putEntity(entity, event.getApplicationId());
+ }
+
+ @Override
+ void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
+ ApplicationEntity entity =
+ createApplicationEntity(event.getApplicationId());
+ RMAppMetrics appMetrics = event.getAppMetrics();
+ entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
+ appMetrics.getVcoreSeconds());
+ entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
+ appMetrics.getMemorySeconds());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
+ eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+ event.getDiagnosticsInfo());
+ eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+ .getFinalApplicationStatus().toString());
+ eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
+ .getYarnApplicationState().toString());
+ if (event.getLatestApplicationAttemptId() != null) {
+ eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
+ event.getLatestApplicationAttemptId().toString());
+ }
+ tEvent.setInfo(eventInfo);
+
+ entity.addEvent(tEvent);
+ putEntity(entity, event.getApplicationId());
+
+ //cleaning up the collector cached
+ event.getApp().stopTimelineCollector();
+ }
+
+ @Override
+ void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
+ ApplicationEntity entity =
+ createApplicationEntity(event.getApplicationId());
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
+ entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
+ event.getViewAppACLs());
+ entity.setInfo(entityInfo);
+
+ putEntity(entity, event.getApplicationId());
+ }
+
+ private static ApplicationEntity createApplicationEntity(
+ ApplicationId applicationId) {
+ ApplicationEntity entity = new ApplicationEntity();
+ entity.setId(applicationId.toString());
+ return entity;
+ }
+
+ @Override
+ void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+ TimelineEntity entity =
+ createAppAttemptEntity(event.getApplicationAttemptId());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
+ eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+ event.getTrackingUrl());
+ eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+ event.getOriginalTrackingURL());
+ eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost());
+ eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
+ event.getRpcPort());
+ eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event
+ .getMasterContainerId().toString());
+ tEvent.setInfo(eventInfo);
+
+ entity.addEvent(tEvent);
+ putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+ }
+
+ @Override
+ void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+ ApplicationAttemptEntity entity =
+ createAppAttemptEntity(event.getApplicationAttemptId());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
+ eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+ event.getTrackingUrl());
+ eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+ event.getOriginalTrackingURL());
+ eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+ event.getDiagnosticsInfo());
+ eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
+ .getFinalApplicationStatus().toString());
+ eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
+ .getYarnApplicationAttemptState().toString());
+ tEvent.setInfo(eventInfo);
+
+ entity.addEvent(tEvent);
+ putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+ }
+
+ @Override
+ void publishContainerCreatedEvent(ContainerCreatedEvent event) {
+ TimelineEntity entity = createContainerEntity(event.getContainerId());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+ // updated as event info instead of entity info, as entity info is updated
+ // by NM
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
+ eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event
+ .getAllocatedResource().getMemory());
+ eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
+ .getAllocatedResource().getVirtualCores());
+ eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
+ .getAllocatedNode().getHost());
+ eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
+ .getAllocatedNode().getPort());
+ eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+ event.getAllocatedPriority().getPriority());
+ eventInfo.put(
+ ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+ event.getNodeHttpAddress());
+ tEvent.setInfo(eventInfo);
+
+ entity.addEvent(tEvent);
+ putEntity(entity, event.getContainerId().getApplicationAttemptId()
+ .getApplicationId());
+ }
+
+ @Override
+ void publishContainerFinishedEvent(ContainerFinishedEvent event) {
+ TimelineEntity entity = createContainerEntity(event.getContainerId());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
+ eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+ event.getDiagnosticsInfo());
+ eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+ event.getContainerExitStatus());
+ eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
+ .getContainerState().toString());
+ tEvent.setInfo(eventInfo);
+
+ entity.addEvent(tEvent);
+ putEntity(entity, event.getContainerId().getApplicationAttemptId()
+ .getApplicationId());
+ }
+
+ private static ContainerEntity createContainerEntity(ContainerId containerId) {
+ ContainerEntity entity = new ContainerEntity();
+ entity.setId(containerId.toString());
+ entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION_ATTEMPT
+ .name(), containerId.getApplicationAttemptId().toString()));
+ return entity;
+ }
+
+ private void putEntity(TimelineEntity entity, ApplicationId appId) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+ + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ }
+ TimelineCollector timelineCollector =
+ rmTimelineCollectorManager.get(appId);
+ TimelineEntities entities = new TimelineEntities();
+ entities.addEntity(entity);
+ timelineCollector.putEntities(entities,
+ UserGroupInformation.getCurrentUser());
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity " + entity, e);
+ }
+ }
+
+ private static ApplicationAttemptEntity createAppAttemptEntity(
+ ApplicationAttemptId appAttemptId) {
+ ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
+ entity.setId(appAttemptId.toString());
+ entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
+ appAttemptId.getApplicationId().toString()));
+ return entity;
+ }
+
+ @Override
+ public boolean publishRMContainerMetrics() {
+ return publishContainerMetrics;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index a68fc77..c2155da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1302,8 +1302,6 @@ public class RMAppImpl implements RMApp, Recoverable {
.applicationFinished(app, finalState);
app.rmContext.getSystemMetricsPublisher()
.appFinished(app, finalState, app.finishTime);
-
- app.stopTimelineCollector();
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 6db1745..13c2ab3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -90,7 +90,7 @@ public class TestRMAppLogAggregationStatus {
new RMContextImpl(rmDispatcher, null, null, null,
null, null, null, null, null,
new RMApplicationHistoryWriter());
- rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
+ rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext
.setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/17842a3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index b122bc4..41b7a8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -45,7 +45,9 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistor
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -71,7 +73,7 @@ public class TestSystemMetricsPublisher {
public static void setup() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
- conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
@@ -85,7 +87,7 @@ public class TestSystemMetricsPublisher {
timelineServer.start();
store = timelineServer.getTimelineStore();
- metricsPublisher = new SystemMetricsPublisher();
+ metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
metricsPublisher.init(conf);
metricsPublisher.start();
}
@@ -339,7 +341,7 @@ public class TestSystemMetricsPublisher {
}
private static RMApp createRMApp(ApplicationId appId) {
- RMApp app = mock(RMApp.class);
+ RMApp app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
when(app.getName()).thenReturn("test app");
when(app.getApplicationType()).thenReturn("test app type");