You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2018/01/16 03:01:35 UTC
hadoop git commit: YARN-6736. Consider writing to both ats v1 & v2
from RM for smoother upgrades. Contributed by Aaron Gresch.
Repository: hadoop
Updated Branches:
refs/heads/trunk a0c71dcc3 -> d09058b2f
YARN-6736. Consider writing to both ats v1 & v2 from RM for smoother upgrades. Contributed by Aaron Gresch.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d09058b2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d09058b2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d09058b2
Branch: refs/heads/trunk
Commit: d09058b2fd18803d12f0835fdf78aef5e0b99c90
Parents: a0c71dc
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Tue Jan 16 07:58:29 2018 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Tue Jan 16 07:58:29 2018 +0530
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 61 ++-
.../distributedshell/ApplicationMaster.java | 48 +-
.../distributedshell/TestDSAppMaster.java | 86 +++-
.../yarn/client/api/impl/YarnClientImpl.java | 8 +-
.../client/api/impl/TimelineClientImpl.java | 6 +-
.../client/api/impl/TimelineV2ClientImpl.java | 3 +-
.../yarn/util/timeline/TimelineUtils.java | 3 +-
.../server/resourcemanager/ResourceManager.java | 44 +-
.../metrics/CombinedSystemMetricsPublisher.java | 108 +++++
.../resourcemanager/TestRMTimelineService.java | 122 +++++
.../TestCombinedSystemMetricsPublisher.java | 476 +++++++++++++++++++
.../hadoop/yarn/server/MiniYARNCluster.java | 15 +-
12 files changed, 912 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c892cfb..271b666 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -19,7 +19,9 @@
package org.apache.hadoop.yarn.conf;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -2271,6 +2273,9 @@ public class YarnConfiguration extends Configuration {
+ "version";
public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f;
+ public static final String TIMELINE_SERVICE_VERSIONS =
+ TIMELINE_SERVICE_PREFIX + "versions";
+
/**
* Comma separated list of names for UIs hosted in the timeline server
* (For pluggable UIs).
@@ -3636,8 +3641,60 @@ public class YarnConfiguration extends Configuration {
* version greater than equal to 2 but smaller than 3.
*/
public static boolean timelineServiceV2Enabled(Configuration conf) {
- return timelineServiceEnabled(conf) &&
- (int)getTimelineServiceVersion(conf) == 2;
+ boolean enabled = false;
+ if (timelineServiceEnabled(conf)) {
+ Collection<Float> versions = getTimelineServiceVersions(conf);
+ for (Float version : versions) {
+ if (version.intValue() == 2) {
+ enabled = true;
+ break;
+ }
+ }
+ }
+ return enabled;
+ }
+
+ /**
+ * Returns whether the timeline service v.1 is enabled via configuration.
+ *
+ * @param conf the configuration
+ * @return whether the timeline service v.1 is enabled. V.1 refers to a
+ * version greater than equal to 1 but smaller than 2.
+ */
+ public static boolean timelineServiceV1Enabled(Configuration conf) {
+ boolean enabled = false;
+ if (timelineServiceEnabled(conf)) {
+ Collection<Float> versions = getTimelineServiceVersions(conf);
+ for (Float version : versions) {
+ if (version.intValue() == 1) {
+ enabled = true;
+ break;
+ }
+ }
+ }
+ return enabled;
+ }
+
+ /**
+ * Returns all the active timeline service versions. It does not check
+ * whether the timeline service itself is enabled.
+ *
+ * @param conf the configuration
+ * @return the timeline service versions as a collection of floats.
+ */
+ private static Collection<Float> getTimelineServiceVersions(
+ Configuration conf) {
+ String versions = conf.get(TIMELINE_SERVICE_VERSIONS);
+ if (versions == null) {
+ versions = Float.toString(getTimelineServiceVersion(conf));
+ }
+ List<String> stringList = Arrays.asList(versions.split(","));
+ List<Float> floatList = new ArrayList<Float>();
+ for (String s : stringList) {
+ Float f = Float.parseFloat(s);
+ floatList.add(f);
+ }
+ return floatList;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index b35a2c9..bd810c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -323,7 +323,8 @@ public class ApplicationMaster {
TimelineClient timelineClient;
// Timeline v2 Client
- private TimelineV2Client timelineV2Client;
+ @VisibleForTesting
+ TimelineV2Client timelineV2Client;
static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
static final String APPID_TIMELINE_FILTER_NAME = "appId";
@@ -632,11 +633,7 @@ public class ApplicationMaster {
containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
"container_retry_interval", "0"));
- if (YarnConfiguration.timelineServiceEnabled(conf)) {
- timelineServiceV2Enabled =
- ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
- timelineServiceV1Enabled = !timelineServiceV2Enabled;
- } else {
+ if (!YarnConfiguration.timelineServiceEnabled(conf)) {
timelineClient = null;
timelineV2Client = null;
LOG.warn("Timeline service is not enabled");
@@ -704,12 +701,11 @@ public class ApplicationMaster {
if (timelineServiceV2Enabled) {
// need to bind timelineClient
amRMClient.registerTimelineV2Client(timelineV2Client);
- }
-
- if (timelineServiceV2Enabled) {
publishApplicationAttemptEventOnTimelineServiceV2(
DSEvent.DS_APP_ATTEMPT_START);
- } else if (timelineServiceV1Enabled) {
+ }
+
+ if (timelineServiceV1Enabled) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
@@ -784,18 +780,23 @@ public class ApplicationMaster {
@Override
public Void run() throws Exception {
if (YarnConfiguration.timelineServiceEnabled(conf)) {
+ timelineServiceV1Enabled =
+ YarnConfiguration.timelineServiceV1Enabled(conf);
+ timelineServiceV2Enabled =
+ YarnConfiguration.timelineServiceV2Enabled(conf);
// Creating the Timeline Client
+ if (timelineServiceV1Enabled) {
+ timelineClient = TimelineClient.createTimelineClient();
+ timelineClient.init(conf);
+ timelineClient.start();
+ LOG.info("Timeline service V1 client is enabled");
+ }
if (timelineServiceV2Enabled) {
timelineV2Client = TimelineV2Client.createTimelineClient(
appAttemptID.getApplicationId());
timelineV2Client.init(conf);
timelineV2Client.start();
LOG.info("Timeline service V2 client is enabled");
- } else {
- timelineClient = TimelineClient.createTimelineClient();
- timelineClient.init(conf);
- timelineClient.start();
- LOG.info("Timeline service V1 client is enabled");
}
} else {
timelineClient = null;
@@ -825,12 +826,14 @@ public class ApplicationMaster {
} catch (InterruptedException ex) {}
}
+ if (timelineServiceV1Enabled) {
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+ DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
+ }
+
if (timelineServiceV2Enabled) {
publishApplicationAttemptEventOnTimelineServiceV2(
DSEvent.DS_APP_ATTEMPT_END);
- } else if (timelineServiceV1Enabled) {
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
- DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
// Join all launched threads
@@ -881,7 +884,8 @@ public class ApplicationMaster {
// Stop Timeline Client
if(timelineServiceV1Enabled) {
timelineClient.stop();
- } else if (timelineServiceV2Enabled) {
+ }
+ if (timelineServiceV2Enabled) {
timelineV2Client.stop();
}
@@ -947,7 +951,8 @@ public class ApplicationMaster {
}
publishContainerEndEventOnTimelineServiceV2(containerStatus,
containerStartTime);
- } else if (timelineServiceV1Enabled) {
+ }
+ if (timelineServiceV1Enabled) {
publishContainerEndEvent(timelineClient, containerStatus, domainId,
appSubmitterUgi);
}
@@ -1113,7 +1118,8 @@ public class ApplicationMaster {
applicationMaster.getContainerStartTimes().put(containerId, startTime);
applicationMaster.publishContainerStartEventOnTimelineServiceV2(
container, startTime);
- } else if (applicationMaster.timelineServiceV1Enabled) {
+ }
+ if (applicationMaster.timelineServiceV1Enabled) {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
index 2789d04..f11bdf8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.applications.distributedshell;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -33,6 +35,8 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -167,14 +171,82 @@ public class TestDSAppMaster {
}
@Test
- public void testTimelineClientInDSAppMaster() throws Exception {
+ public void testTimelineClientInDSAppMasterV1() throws Exception {
+ runTimelineClientInDSAppMaster(true, false);
+ }
+
+ @Test
+ public void testTimelineClientInDSAppMasterV2() throws Exception {
+ runTimelineClientInDSAppMaster(false, true);
+ }
+
+ @Test
+ public void testTimelineClientInDSAppMasterV1V2() throws Exception {
+ runTimelineClientInDSAppMaster(true, true);
+ }
+
+ @Test
+ public void testTimelineClientInDSAppMasterDisabled() throws Exception {
+ runTimelineClientInDSAppMaster(false, false);
+ }
+
+ private void runTimelineClientInDSAppMaster(boolean v1Enabled,
+ boolean v2Enabled) throws Exception {
+ ApplicationMaster appMaster = createAppMasterWithStartedTimelineService(
+ v1Enabled, v2Enabled);
+ validateAppMasterTimelineService(v1Enabled, v2Enabled, appMaster);
+ }
+
+ private void validateAppMasterTimelineService(boolean v1Enabled,
+ boolean v2Enabled, ApplicationMaster appMaster) {
+ if (v1Enabled) {
+ Assert.assertEquals(appMaster.appSubmitterUgi,
+ ((TimelineClientImpl)appMaster.timelineClient).getUgi());
+ } else {
+ Assert.assertNull(appMaster.timelineClient);
+ }
+ if (v2Enabled) {
+ Assert.assertNotNull(appMaster.timelineV2Client);
+ } else {
+ Assert.assertNull(appMaster.timelineV2Client);
+ }
+ }
+
+ private ApplicationMaster createAppMasterWithStartedTimelineService(
+ boolean v1Enabled, boolean v2Enabled) throws Exception {
ApplicationMaster appMaster = new ApplicationMaster();
- appMaster.appSubmitterUgi =
- UserGroupInformation.createUserForTesting("foo", new String[]{"bar"});
- Configuration conf = new YarnConfiguration();
- conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ appMaster.appSubmitterUgi = UserGroupInformation
+ .createUserForTesting("foo", new String[] {"bar"});
+ Configuration conf = this.getTimelineServiceConf(v1Enabled, v2Enabled);
+ ApplicationId appId = ApplicationId.newInstance(1L, 1);
+ appMaster.appAttemptID = ApplicationAttemptId.newInstance(appId, 1);
appMaster.startTimelineClient(conf);
- Assert.assertEquals(appMaster.appSubmitterUgi,
- ((TimelineClientImpl)appMaster.timelineClient).getUgi());
+ return appMaster;
+ }
+
+ private Configuration getTimelineServiceConf(boolean v1Enabled,
+ boolean v2Enabled) {
+ Configuration conf = new YarnConfiguration(new Configuration(false));
+ Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf));
+
+ if (v1Enabled || v2Enabled) {
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ }
+
+ if (v1Enabled) {
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+ }
+
+ if (v2Enabled) {
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+ conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+ FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+ }
+
+ if (v1Enabled && v2Enabled) {
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f");
+ }
+ return conf;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 9a9978d..072e606 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -177,13 +177,7 @@ public class YarnClientImpl extends YarnClient {
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
}
- float timelineServiceVersion =
- conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
- if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
- && ((Float.compare(timelineServiceVersion, 1.0f) == 0)
- || (Float.compare(timelineServiceVersion, 1.5f) == 0))) {
+ if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
timelineV1ServiceEnabled = true;
timelineDTRenewer = getTimelineDelegationTokenRenewer(conf);
timelineService = TimelineUtils.buildTimelineTokenService(conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index f49618b..44d6d48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -99,14 +99,12 @@ public class TimelineClientImpl extends TimelineClient {
timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
- LOG.info("Timeline service address: " + getTimelineServiceAddress());
- if (!YarnConfiguration.timelineServiceEnabled(conf)
- || !((Float.compare(this.timelineServiceVersion, 1.0f) == 0)
- || (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) {
+ if (!YarnConfiguration.timelineServiceV1Enabled(conf)) {
throw new IOException("Timeline V1 client is not properly configured. "
+ "Either timeline service is not enabled or version is not set to"
+ " 1.x");
}
+ LOG.info("Timeline service address: " + getTimelineServiceAddress());
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
index 220d6af..02c9519 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
@@ -94,8 +94,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
}
protected void serviceInit(Configuration conf) throws Exception {
- if (!YarnConfiguration.timelineServiceEnabled(conf)
- || (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) {
+ if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
throw new IOException("Timeline V2 client is not properly configured. "
+ "Either timeline service is not enabled or version is not set to"
+ " 2");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
index 3b12f3c..a0c4b72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
@@ -118,7 +118,8 @@ public class TimelineUtils {
}
/**
- * Returns whether the timeline service v.1.5 is enabled via configuration.
+ * Returns whether the timeline service v.1.5 is enabled by default via
+ * configuration.
*
* @param conf the configuration
* @return whether the timeline service v.1.5 is enabled. V.1.5 refers to a
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index a0317f6..8641842 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPub
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
@@ -513,26 +514,33 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected SystemMetricsPublisher createSystemMetricsPublisher() {
- SystemMetricsPublisher publisher;
- if (YarnConfiguration.timelineServiceEnabled(conf) &&
- YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
- if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
- // we're dealing with the v.2.x publisher
- LOG.info("system metrics publisher with the timeline service V2 is " +
- "configured");
- publisher = new TimelineServiceV2Publisher(
- rmContext.getRMTimelineCollectorManager());
- } else {
- // we're dealing with the v.1.x publisher
- LOG.info("system metrics publisher with the timeline service V1 is " +
- "configured");
- publisher = new TimelineServiceV1Publisher();
- }
- } else {
+ List<SystemMetricsPublisher> publishers =
+ new ArrayList<SystemMetricsPublisher>();
+ if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
+ SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher();
+ publishers.add(publisherV1);
+ }
+ if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+ // we're dealing with the v.2.x publisher
+ LOG.info("system metrics publisher with the timeline service V2 is "
+ + "configured");
+ SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher(
+ rmContext.getRMTimelineCollectorManager());
+ publishers.add(publisherV2);
+ }
+ if (publishers.isEmpty()) {
LOG.info("TimelineServicePublisher is not configured");
- publisher = new NoOpSystemMetricPublisher();
+ SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher();
+ publishers.add(noopPublisher);
+ }
+
+ for (SystemMetricsPublisher publisher : publishers) {
+ addIfService(publisher);
}
- return publisher;
+
+ SystemMetricsPublisher combinedPublisher =
+ new CombinedSystemMetricsPublisher(publishers);
+ return combinedPublisher;
}
// sanity check for configurations
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java
new file mode 100644
index 0000000..9646747
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.Collection;
+
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ * A metrics publisher that can publish for a collection of publishers.
+ */
+public class CombinedSystemMetricsPublisher implements SystemMetricsPublisher {
+ private Collection<SystemMetricsPublisher> publishers;
+
+ public CombinedSystemMetricsPublisher(Collection<SystemMetricsPublisher>
+ publishers) {
+ this.publishers = publishers;
+ }
+
+ @Override
+ public void appCreated(RMApp app, long createdTime) {
+ for (SystemMetricsPublisher publisher : this.publishers) {
+ publisher.appCreated(app, createdTime);
+ }
+ }
+
+ @Override
+ public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
+ for (SystemMetricsPublisher publisher : this.publishers) {
+ publisher.appACLsUpdated(app, appViewACLs, updatedTime);
+ }
+ }
+
+ @Override
+ public void appUpdated(RMApp app, long updatedTime) {
+ for (SystemMetricsPublisher publisher : this.publishers) {
+ publisher.appUpdated(app, updatedTime);
+ }
+ }
+
+ @Override
+ public void appStateUpdated(RMApp app, YarnApplicationState appState,
+ long updatedTime) {
+ for (SystemMetricsPublisher publisher : this.publishers) {
+ publisher.appStateUpdated(app, appState, updatedTime);
+ }
+ }
+
+ @Override
+ public void appFinished(RMApp app, RMAppState state, long finishedTime) {
+ for (SystemMetricsPublisher publisher : this.publishers) {
+ publisher.appFinished(app, state, finishedTime);
+ }
+ }
+
+ @Override
+ public void appAttemptRegistered(RMAppAttempt appAttempt,
+ long registeredTime) {
+ for (SystemMetricsPublisher publisher : this.publishers) {
+ publisher.appAttemptRegistered(appAttempt, registeredTime);
+ }
+ }
+
+ @Override
+ public void appAttemptFinished(RMAppAttempt appAttempt,
+ RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+ for (SystemMetricsPublisher publisher : this.publishers) {
+ publisher.appAttemptFinished(appAttempt, appAttemtpState, app,
+ finishedTime);
+ }
+ }
+
+ @Override
+ public void containerCreated(RMContainer container, long createdTime) {
+ for (SystemMetricsPublisher publisher : this.publishers) {
+ publisher.containerCreated(container, createdTime);
+ }
+ }
+
+ @Override
+ public void containerFinished(RMContainer container, long finishedTime) {
+ for (SystemMetricsPublisher publisher : this.publishers) {
+ publisher.containerFinished(container, finishedTime);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java
new file mode 100644
index 0000000..f824fa1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests that the RM creates timeline services (v1/v2) as specified by the
+ * configuration.
+ */
+public class TestRMTimelineService {
+ private static MockRM rm;
+
+ private void setup(boolean v1Enabled, boolean v2Enabled) {
+ Configuration conf = new YarnConfiguration(new Configuration(false));
+ Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf));
+
+ if (v1Enabled || v2Enabled) {
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ }
+
+ if (v1Enabled) {
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+ }
+
+ if (v2Enabled) {
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+ conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+ FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+ }
+
+ if (v1Enabled && v2Enabled) {
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f");
+ }
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ rm = new MockRM(conf, memStore);
+ rm.start();
+ }
+
+ // validate RM services exist or not as we specified
+ private void validate(boolean v1Enabled, boolean v2Enabled) {
+ boolean v1PublisherServiceFound = false;
+ boolean v2PublisherServiceFound = false;
+ List<Service> services = rm.getServices();
+ for (Service service : services) {
+ if (service instanceof TimelineServiceV1Publisher) {
+ v1PublisherServiceFound = true;
+ } else if (service instanceof TimelineServiceV2Publisher) {
+ v2PublisherServiceFound = true;
+ }
+ }
+
+ Assert.assertEquals(v1Enabled, v1PublisherServiceFound);
+ Assert.assertEquals(v2Enabled, v2PublisherServiceFound);
+ }
+
+ private void cleanup() throws Exception {
+ rm.close();
+ rm.stop();
+ }
+
+ // runs test to validate RM creates a timeline service publisher if and
+ // only if the service is enabled for v1 and v2 (independently).
+ private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception {
+ setup(v1Enabled, v2Enabled);
+ validate(v1Enabled, v2Enabled);
+ cleanup();
+ }
+
+ @Test
+ public void testTimelineServiceV1V2Enabled() throws Exception {
+ runTest(true, true);
+ }
+
+ @Test
+ public void testTimelineServiceV1Enabled() throws Exception {
+ runTest(true, false);
+ }
+
+ @Test
+ public void testTimelineServiceV2Enabled() throws Exception {
+ runTest(false, true);
+ }
+
+ @Test
+ public void testTimelineServiceDisabled() throws Exception {
+ runTest(false, false);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java
new file mode 100644
index 0000000..830d01c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
+import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests that a CombinedSystemMetricsPublisher publishes metrics for timeline
+ * services (v1/v2) as specified by the configuration.
+ */
+public class TestCombinedSystemMetricsPublisher {
+ /**
+ * The folder where the FileSystemTimelineWriterImpl writes the entities.
+ */
+ private static File testRootDir = new File("target",
+ TestCombinedSystemMetricsPublisher.class.getName() + "-localDir")
+ .getAbsoluteFile();
+
+ private static ApplicationHistoryServer timelineServer;
+ private static CombinedSystemMetricsPublisher metricsPublisher;
+ private static TimelineStore store;
+ private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
+ private static RMTimelineCollectorManager rmTimelineCollectorManager;
+ private static DrainDispatcher dispatcher;
+ private static YarnConfiguration conf;
+ private static TimelineServiceV1Publisher publisherV1;
+ private static TimelineServiceV2Publisher publisherV2;
+ private static ApplicationAttemptId appAttemptId;
+ private static RMApp app;
+
+ private void testSetup(boolean enableV1, boolean enableV2) throws Exception {
+
+ if (testRootDir.exists()) {
+ //cleanup before hand
+ FileContext.getLocalFSFileContext().delete(
+ new Path(testRootDir.getAbsolutePath()), true);
+ }
+
+ conf = getConf(enableV1, enableV2);
+
+ RMContext rmContext = mock(RMContext.class);
+ rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
+ when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
+ ResourceManager rm = mock(ResourceManager.class);
+ when(rm.getRMContext()).thenReturn(rmContext);
+
+ if (enableV2) {
+ dispatcher = new DrainDispatcher();
+ rmTimelineCollectorManager = new RMTimelineCollectorManager(rm);
+ when(rmContext.getRMTimelineCollectorManager()).thenReturn(
+ rmTimelineCollectorManager);
+
+ rmTimelineCollectorManager.init(conf);
+ rmTimelineCollectorManager.start();
+ } else {
+ dispatcher = null;
+ rmTimelineCollectorManager = null;
+ }
+
+ timelineServer = new ApplicationHistoryServer();
+ timelineServer.init(conf);
+ timelineServer.start();
+ store = timelineServer.getTimelineStore();
+
+ if (enableV2) {
+ dispatcher.init(conf);
+ dispatcher.start();
+ }
+
+ List<SystemMetricsPublisher> publishers =
+ new ArrayList<SystemMetricsPublisher>();
+
+ if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
+ Assert.assertTrue(enableV1);
+ publisherV1 = new TimelineServiceV1Publisher();
+ publishers.add(publisherV1);
+ publisherV1.init(conf);
+ publisherV1.start();
+ } else {
+ Assert.assertFalse(enableV1);
+ publisherV1 = null;
+ }
+
+ if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+ Assert.assertTrue(enableV2);
+ publisherV2 = new TimelineServiceV2Publisher(
+ rmTimelineCollectorManager) {
+ @Override
+ protected Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+ };
+ publishers.add(publisherV2);
+ publisherV2.init(conf);
+ publisherV2.start();
+ } else {
+ Assert.assertFalse(enableV2);
+ publisherV2 = null;
+ }
+
+ if (publishers.isEmpty()) {
+ NoOpSystemMetricPublisher noopPublisher =
+ new NoOpSystemMetricPublisher();
+ publishers.add(noopPublisher);
+ }
+
+ metricsPublisher = new CombinedSystemMetricsPublisher(publishers);
+ }
+
+ private void testCleanup() throws Exception {
+ if (publisherV1 != null) {
+ publisherV1.stop();
+ }
+ if (publisherV2 != null) {
+ publisherV2.stop();
+ }
+ if (timelineServer != null) {
+ timelineServer.stop();
+ }
+ if (testRootDir.exists()) {
+ FileContext.getLocalFSFileContext().delete(
+ new Path(testRootDir.getAbsolutePath()), true);
+ }
+ if (rmTimelineCollectorManager != null) {
+ rmTimelineCollectorManager.stop();
+ }
+ }
+
+ private static YarnConfiguration getConf(boolean v1Enabled,
+ boolean v2Enabled) {
+ YarnConfiguration yarnConf = new YarnConfiguration();
+
+ if (v1Enabled || v2Enabled) {
+ yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ } else {
+ yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
+ }
+
+ if (v1Enabled) {
+ yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
+ yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
+ MemoryTimelineStore.class, TimelineStore.class);
+ yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
+ MemoryTimelineStateStore.class, TimelineStateStore.class);
+ }
+
+ if (v2Enabled) {
+ yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "2.0");
+ yarnConf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+ true);
+ yarnConf.setBoolean(
+ YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, true);
+ yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+ FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+
+ try {
+ yarnConf.set(
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ testRootDir.getCanonicalPath());
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Exception while setting the " +
+ "TIMELINE_SERVICE_STORAGE_DIR_ROOT ");
+ }
+ }
+
+ if (v1Enabled && v2Enabled) {
+ yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
+ yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f");
+ }
+
+ yarnConf.setInt(
+ YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2);
+
+ return yarnConf;
+ }
+
+ // runs test to validate timeline events are published if and only if the
+ // service is enabled for v1 and v2 (independently).
+ private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception {
+ testSetup(v1Enabled, v2Enabled);
+ publishEvents(v1Enabled, v2Enabled);
+ validateV1(v1Enabled);
+ validateV2(v2Enabled);
+ testCleanup();
+ }
+
+ @Test(timeout = 10000)
+ public void testTimelineServiceEventPublishingV1V2Enabled()
+ throws Exception {
+ runTest(true, true);
+ }
+
+ @Test(timeout = 10000)
+ public void testTimelineServiceEventPublishingV1Enabled() throws Exception {
+ runTest(true, false);
+ }
+
+ @Test(timeout = 10000)
+ public void testTimelineServiceEventPublishingV2Enabled() throws Exception {
+ runTest(false, true);
+ }
+
+ @Test(timeout = 10000)
+ public void testTimelineServiceEventPublishingNoService() throws Exception {
+ runTest(false, false);
+ }
+
+ private void publishEvents(boolean v1Enabled, boolean v2Enabled) {
+ long timestamp = (v1Enabled) ? 1 : 2;
+ int id = (v2Enabled) ? 3 : 4;
+ ApplicationId appId = ApplicationId.newInstance(timestamp, id);
+
+ app = createRMApp(appId);
+ rmAppsMapInContext.putIfAbsent(appId, app);
+
+ if (v2Enabled) {
+ AppLevelTimelineCollector collector =
+ new AppLevelTimelineCollector(appId);
+ rmTimelineCollectorManager.putIfAbsent(appId, collector);
+ }
+ appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ RMAppAttempt appAttempt = createRMAppAttempt(true);
+
+ metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
+ metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
+ app, Integer.MAX_VALUE + 2L);
+ if (v2Enabled) {
+ dispatcher.await();
+ }
+ }
+
+ private void validateV1(boolean v1Enabled) throws Exception {
+ TimelineEntity entity = null;
+
+ if (!v1Enabled) {
+ Thread.sleep(1000);
+ entity =
+ store.getEntity(appAttemptId.toString(),
+ AppAttemptMetricsConstants.ENTITY_TYPE,
+ EnumSet.allOf(Field.class));
+ Assert.assertNull(entity);
+ return;
+ }
+
+ do {
+ entity =
+ store.getEntity(appAttemptId.toString(),
+ AppAttemptMetricsConstants.ENTITY_TYPE,
+ EnumSet.allOf(Field.class));
+ Thread.sleep(100);
+ // ensure two events are both published before leaving the loop
+ } while (entity == null || entity.getEvents().size() < 2);
+
+ boolean hasRegisteredEvent = false;
+ boolean hasFinishedEvent = false;
+ for (org.apache.hadoop.yarn.api.records.timeline.TimelineEvent event :
+ entity.getEvents()) {
+ if (event.getEventType().equals(
+ AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) {
+ hasRegisteredEvent = true;
+ } else if (event.getEventType().equals(
+ AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) {
+ hasFinishedEvent = true;
+ Assert.assertEquals(
+ FinalApplicationStatus.UNDEFINED.toString(),
+ event.getEventInfo().get(
+ AppAttemptMetricsConstants.FINAL_STATUS_INFO));
+ Assert.assertEquals(
+ YarnApplicationAttemptState.FINISHED.toString(),
+ event.getEventInfo().get(
+ AppAttemptMetricsConstants.STATE_INFO));
+ }
+ Assert
+ .assertEquals(appAttemptId.toString(), entity.getEntityId());
+ }
+ Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent);
+ }
+
+ private void validateV2(boolean v2Enabled) throws Exception {
+ String outputDirApp =
+ getTimelineEntityDir() + "/"
+ + TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/";
+
+ File entityFolder = new File(outputDirApp);
+ Assert.assertEquals(v2Enabled, entityFolder.isDirectory());
+
+ if (v2Enabled) {
+ String timelineServiceFileName = appAttemptId.toString()
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ File entityFile = new File(outputDirApp, timelineServiceFileName);
+ Assert.assertTrue(entityFile.exists());
+ long idPrefix = TimelineServiceHelper
+ .invertLong(appAttemptId.getAttemptId());
+ verifyEntity(entityFile, 2,
+ AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 0, idPrefix);
+ }
+ }
+
+ private void verifyEntity(File entityFile, long expectedEvents,
+ String eventForCreatedTime, long expectedMetrics, long idPrefix)
+ throws IOException {
+
+ BufferedReader reader = null;
+ String strLine;
+ long count = 0;
+ long metricsCount = 0;
+ try {
+ reader = new BufferedReader(new FileReader(entityFile));
+ while ((strLine = reader.readLine()) != null) {
+ if (strLine.trim().length() > 0) {
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+ entity = FileSystemTimelineReaderImpl
+ .getTimelineRecordFromJSON(strLine.trim(),
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.class);
+ metricsCount = entity.getMetrics().size();
+ assertEquals(idPrefix, entity.getIdPrefix());
+ for (TimelineEvent event : entity.getEvents()) {
+ if (event.getId().equals(eventForCreatedTime)) {
+ assertTrue(entity.getCreatedTime() > 0);
+ break;
+ }
+ }
+ count++;
+ }
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals("Expected " + expectedEvents + " events to be published",
+ expectedEvents, count);
+ assertEquals("Expected " + expectedMetrics + " metrics is incorrect",
+ expectedMetrics, metricsCount);
+ }
+
+ private String getTimelineEntityDir() {
+ String outputDirApp =
+ testRootDir.getAbsolutePath() + "/"
+ + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/"
+ + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/"
+ + app.getUser() + "/"
+ + app.getName() + "/"
+ + TimelineUtils.DEFAULT_FLOW_VERSION + "/"
+ + app.getStartTime() + "/"
+ + app.getApplicationId();
+ return outputDirApp;
+ }
+
+ private static RMAppAttempt createRMAppAttempt(boolean unmanagedAMAttempt) {
+ RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+ when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
+ when(appAttempt.getHost()).thenReturn("test host");
+ when(appAttempt.getRpcPort()).thenReturn(-100);
+ if (!unmanagedAMAttempt) {
+ Container container = mock(Container.class);
+ when(container.getId())
+ .thenReturn(ContainerId.newContainerId(appAttemptId, 1));
+ when(appAttempt.getMasterContainer()).thenReturn(container);
+ }
+ when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
+ when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
+ when(appAttempt.getOriginalTrackingUrl()).thenReturn(
+ "test original tracking url");
+ return appAttempt;
+ }
+
+ private static RMApp createRMApp(ApplicationId appId) {
+ RMApp rmApp = mock(RMAppImpl.class);
+ when(rmApp.getApplicationId()).thenReturn(appId);
+ when(rmApp.getName()).thenReturn("test app");
+ when(rmApp.getApplicationType()).thenReturn("test app type");
+ when(rmApp.getUser()).thenReturn("testUser");
+ when(rmApp.getQueue()).thenReturn("test queue");
+ when(rmApp.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
+ when(rmApp.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
+ when(rmApp.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
+ when(rmApp.getDiagnostics()).thenReturn(
+ new StringBuilder("test diagnostics info"));
+ RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+ when(appAttempt.getAppAttemptId()).thenReturn(
+ ApplicationAttemptId.newInstance(appId, 1));
+ when(rmApp.getCurrentAppAttempt()).thenReturn(appAttempt);
+ when(rmApp.getFinalApplicationStatus()).thenReturn(
+ FinalApplicationStatus.UNDEFINED);
+ Map<String, Long> resourceMap = new HashMap<>();
+ resourceMap
+ .put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
+ resourceMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
+ Map<String, Long> preemptedMap = new HashMap<>();
+ preemptedMap
+ .put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
+ preemptedMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
+ when(rmApp.getRMAppMetrics()).thenReturn(
+ new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, resourceMap,
+ preemptedMap));
+ when(rmApp.getApplicationTags()).thenReturn(
+ Collections.<String> emptySet());
+ ApplicationSubmissionContext appSubmissionContext =
+ mock(ApplicationSubmissionContext.class);
+ when(appSubmissionContext.getPriority())
+ .thenReturn(Priority.newInstance(0));
+
+ ContainerLaunchContext containerLaunchContext =
+ mock(ContainerLaunchContext.class);
+ when(containerLaunchContext.getCommands())
+ .thenReturn(Collections.singletonList("java -Xmx1024m"));
+ when(appSubmissionContext.getAMContainerSpec())
+ .thenReturn(containerLaunchContext);
+ when(rmApp.getApplicationPriority()).thenReturn(Priority.newInstance(10));
+ when(rmApp.getApplicationSubmissionContext())
+ .thenReturn(appSubmissionContext);
+ return rmApp;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d09058b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 8245fd6..cbb69f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -251,6 +251,15 @@ public class MiniYARNCluster extends CompositeService {
useFixedPorts = conf.getBoolean(
YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
+
+ if (!useFixedPorts) {
+ String hostname = MiniYARNCluster.getHostname();
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0");
+
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ hostname + ":" + ServerSocketUtil.getPort(9188, 10));
+ }
+
useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
@@ -808,12 +817,6 @@ public class MiniYARNCluster extends CompositeService {
}
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class);
- if (!useFixedPorts) {
- String hostname = MiniYARNCluster.getHostname();
- conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0");
- conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
- hostname + ":" + ServerSocketUtil.getPort(9188, 10));
- }
appHistoryServer.init(conf);
super.serviceInit(conf);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org