You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2015/12/11 20:19:29 UTC
[2/2] hadoop git commit: YARN-4356. Ensure the timeline service v.2
is disabled cleanly and has no impact when it's turned off. Contributed by
Sangjin Lee.
YARN-4356. Ensure the timeline service v.2 is disabled cleanly and has no
impact when it's turned off. Contributed by Sangjin Lee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/45510fc6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/45510fc6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/45510fc6
Branch: refs/heads/feature-YARN-2928
Commit: 45510fc6bdcfe48aa8aa192f511ebfdada370c78
Parents: 0801563
Author: Li Lu <gt...@apache.org>
Authored: Fri Dec 11 11:17:34 2015 -0800
Committer: Li Lu <gt...@apache.org>
Committed: Fri Dec 11 11:17:34 2015 -0800
----------------------------------------------------------------------
.../jobhistory/JobHistoryEventHandler.java | 64 ++++---
.../hadoop/mapreduce/v2/app/MRAppMaster.java | 11 +-
.../apache/hadoop/mapreduce/MRJobConfig.java | 5 -
.../src/main/resources/mapred-default.xml | 7 -
.../mapred/TestMRTimelineEventHandling.java | 5 +-
.../hadoop/mapreduce/v2/MiniMRYarnCluster.java | 2 +-
.../hadoop/yarn/conf/YarnConfiguration.java | 58 +++++-
.../distributedshell/ApplicationMaster.java | 191 ++++++++-----------
.../applications/distributedshell/Client.java | 16 --
.../distributedshell/TestDistributedShell.java | 14 +-
.../impl/pb/AllocateResponsePBImpl.java | 4 +-
.../hadoop/yarn/client/api/TimelineClient.java | 18 +-
.../client/api/impl/TimelineClientImpl.java | 11 +-
.../src/main/resources/yarn-default.xml | 5 +-
.../impl/pb/NodeHeartbeatRequestPBImpl.java | 10 +-
.../impl/pb/NodeHeartbeatResponsePBImpl.java | 10 +-
.../hadoop/yarn/server/nodemanager/Context.java | 3 +-
.../yarn/server/nodemanager/NodeManager.java | 23 ++-
.../nodemanager/NodeStatusUpdaterImpl.java | 48 +++--
.../collectormanager/NMCollectorService.java | 10 +-
.../containermanager/ContainerManagerImpl.java | 59 ++++--
.../application/ApplicationImpl.java | 70 +++++--
.../monitor/ContainersMonitorImpl.java | 11 +-
.../timelineservice/NMTimelinePublisher.java | 49 ++---
.../TestContainerManagerRecovery.java | 11 +-
.../application/TestApplication.java | 3 +-
.../nodemanager/webapp/TestNMWebServices.java | 9 +-
.../ApplicationMasterService.java | 11 +-
.../server/resourcemanager/ClientRMService.java | 35 ++--
.../server/resourcemanager/RMAppManager.java | 7 +-
.../server/resourcemanager/ResourceManager.java | 33 ++--
.../resourcemanager/ResourceTrackerService.java | 21 +-
.../resourcemanager/amlauncher/AMLauncher.java | 15 +-
.../metrics/TimelineServiceV2Publisher.java | 2 +-
.../server/resourcemanager/rmapp/RMAppImpl.java | 16 +-
.../resourcemanager/TestClientRMService.java | 3 +
.../metrics/TestSystemMetricsPublisher.java | 2 +-
.../TestSystemMetricsPublisherForV2.java | 1 +
.../TestTimelineServiceClientIntegration.java | 30 ++-
.../PerNodeTimelineCollectorsAuxService.java | 15 +-
.../reader/TimelineReaderServer.java | 14 +-
...TestPerNodeTimelineCollectorsAuxService.java | 9 +-
.../reader/TestTimelineReaderServer.java | 3 +
.../reader/TestTimelineReaderWebServices.java | 2 +
...stTimelineReaderWebServicesHBaseStorage.java | 2 +
45 files changed, 540 insertions(+), 408 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index f52e654..19699fb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -19,9 +19,6 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -30,7 +27,11 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -46,7 +47,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -56,9 +56,9 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
-import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@@ -74,9 +74,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a
@@ -122,20 +121,17 @@ public class JobHistoryEventHandler extends AbstractService
protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
-
- // For posting entities in new timeline service in a non-blocking way
- // TODO YARN-3367 replace with event loop in TimelineClient.
- private static ExecutorService threadPool =
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
- .build());
// should job completion be force when the AM shuts down?
protected volatile boolean forceJobCompletion = false;
protected TimelineClient timelineClient;
- private boolean newTimelineServiceEnabled = false;
+ private boolean timelineServiceV2Enabled = false;
+
+ // For posting entities in new timeline service in a non-blocking way
+ // TODO YARN-3367 replace with event loop in TimelineClient.
+ private ExecutorService threadPool;
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
@@ -265,22 +261,26 @@ public class JobHistoryEventHandler extends AbstractService
// configuration status: off, on_with_v1 or on_with_v2.
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
- if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-
+ LOG.info("Emitting job history data to the timeline service is enabled");
+ if (YarnConfiguration.timelineServiceEnabled(conf)) {
+
timelineClient =
((MRAppMaster.RunningAppContext)context).getTimelineClient();
timelineClient.init(conf);
- newTimelineServiceEnabled = conf.getBoolean(
- MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
- MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
- LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1"));
- LOG.info("Emitting job history data to the timeline server is enabled");
+ timelineServiceV2Enabled =
+ YarnConfiguration.timelineServiceV2Enabled(conf);
+ LOG.info("Timeline service is enabled; version: " +
+ YarnConfiguration.getTimelineServiceVersion(conf));
+ if (timelineServiceV2Enabled) {
+ // initialize the thread pool for v.2 timeline service
+ threadPool = createThreadPool();
+ }
} else {
LOG.info("Timeline service is not enabled");
}
} else {
- LOG.info("Emitting job history data to the timeline server is not enabled");
+ LOG.info("Emitting job history data to the timeline server is not " +
+ "enabled");
}
// Flag for setting
@@ -448,19 +448,27 @@ public class JobHistoryEventHandler extends AbstractService
if (timelineClient != null) {
timelineClient.stop();
}
- shutdownAndAwaitTermination();
+ if (threadPool != null) {
+ shutdownAndAwaitTermination();
+ }
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
}
// TODO remove threadPool after adding non-blocking call in TimelineClient
- private static void shutdownAndAwaitTermination() {
+ private ExecutorService createThreadPool() {
+ return Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+ .build());
+ }
+
+ private void shutdownAndAwaitTermination() {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
- LOG.error("ThreadPool did not terminate");
+ LOG.error("ThreadPool did not terminate");
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
@@ -622,7 +630,7 @@ public class JobHistoryEventHandler extends AbstractService
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
if (timelineClient != null) {
- if (newTimelineServiceEnabled) {
+ if (timelineServiceV2Enabled) {
processEventForNewTimelineService(historyEvent, event.getJobID(),
event.getTimestamp());
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index dafb6e9..a7c9245 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -1017,14 +1017,9 @@ public class MRAppMaster extends CompositeService {
this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)
- && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-
- boolean newTimelineServiceEnabled = conf.getBoolean(
- MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
- MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
-
- if (newTimelineServiceEnabled) {
+ && YarnConfiguration.timelineServiceEnabled(conf)) {
+
+ if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// create new version TimelineClient
timelineClient = TimelineClient.createTimelineClient(
appAttemptID.getApplicationId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 3ab6eeb..3d1e841 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -467,11 +467,6 @@ public interface MRJobConfig {
"mapreduce.job.emit-timeline-data";
public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
false;
-
- public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
- "mapreduce.job.new-timeline-service.enabled";
- public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
- false;
public static final String MR_PREFIX = "yarn.app.mapreduce.";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 0585234..6ece048 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -618,13 +618,6 @@
</description>
</property>
- <property>
- <name>mapreduce.job.new-timeline-service.enabled</name>
- <value>false</value>
- <description>Specifies if posting job and task events to new timeline service.
- </description>
-</property>
-
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>0</value>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index b3ea26e..7b322e5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -161,11 +161,10 @@ public class TestMRTimelineEventHandling {
LOG.info("testMRNewTimelineServiceEventHandling start.");
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ // enable new timeline service
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
- // enable new timeline serivce in MR side
- conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, true);
-
// enable aux-service based timeline collectors
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
index 18a4c14..edb825d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
@@ -173,7 +173,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
boolean enableTimelineAuxService = false;
if (nmAuxServices != null) {
for (String nmAuxService: nmAuxServices) {
- if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) {
+ if (nmAuxService.equals(TIMELINE_AUX_SERVICE_NAME)) {
enableTimelineAuxService = true;
break;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0165593..6a3854a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -83,6 +83,10 @@ public class YarnConfiguration extends Configuration {
new DeprecationDelta("yarn.client.max-nodemanagers-proxies",
NM_CLIENT_MAX_NM_PROXIES)
});
+ Configuration.addDeprecations(new DeprecationDelta[] {
+ new DeprecationDelta(RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+ SYSTEM_METRICS_PUBLISHER_ENABLED)
+ });
}
//Configurations
@@ -383,7 +387,8 @@ public class YarnConfiguration extends Configuration {
/**
* The setting that controls whether yarn system metrics is published on the
- * timeline server or not by RM. This configuration setting is for ATS V1
+ * timeline server or not by RM. This configuration setting is for ATS V1.
+ * This is now deprecated in favor of SYSTEM_METRICS_PUBLISHER_ENABLED.
*/
public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX
+ "system-metrics-publisher.enabled";
@@ -2344,13 +2349,52 @@ public class YarnConfiguration extends Configuration {
}
return clusterId;
}
-
- public static boolean systemMetricsPublisherEnabled(Configuration conf) {
+
+ // helper methods for timeline service configuration
+ /**
+ * Returns whether the timeline service is enabled via configuration.
+ *
+ * @param conf the configuration
+ * @return whether the timeline service is enabled.
+ */
+ public static boolean timelineServiceEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
- && conf.getBoolean(
- YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
- YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+ }
+
+ /**
+ * Returns the timeline service version. It does not check whether the
+ * timeline service itself is enabled.
+ *
+ * @param conf the configuration
+ * @return the timeline service version as a float.
+ */
+ public static float getTimelineServiceVersion(Configuration conf) {
+ return conf.getFloat(TIMELINE_SERVICE_VERSION,
+ DEFAULT_TIMELINE_SERVICE_VERSION);
+ }
+
+ /**
+ * Returns whether the timeline service v.2 is enabled via configuration.
+ *
+ * @param conf the configuration
+ * @return whether the timeline service v.2 is enabled. V.2 refers to a
+ * version greater than equal to 2 but smaller than 3.
+ */
+ public static boolean timelineServiceV2Enabled(Configuration conf) {
+ return timelineServiceEnabled(conf) &&
+ (int)getTimelineServiceVersion(conf) == 2;
+ }
+
+ /**
+ * Returns whether the system publisher is enabled.
+ *
+ * @param conf the configuration
+ * @return whether the system publisher is enabled.
+ */
+ public static boolean systemMetricsPublisherEnabled(Configuration conf) {
+ return conf.getBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+ YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
}
/* For debugging. mp configurations to system output as XML format. */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index e27c947..380ba29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -217,14 +217,11 @@ public class ApplicationMaster {
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
- private boolean newTimelineService = false;
+ private boolean timelineServiceV2 = false;
// For posting entities in new timeline service in a non-blocking way
// TODO replace with event loop in TimelineClient.
- private static ExecutorService threadPool =
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
- .build());
+ private ExecutorService threadPool;
// App Master configuration
// No. of containers to run shell command on
@@ -314,8 +311,10 @@ public class ApplicationMaster {
}
appMaster.run();
result = appMaster.finish();
-
- shutdownAndAwaitTermination();
+
+ if (appMaster.threadPool != null) {
+ appMaster.shutdownAndAwaitTermination();
+ }
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
LogManager.shutdown();
@@ -329,16 +328,22 @@ public class ApplicationMaster {
System.exit(2);
}
}
-
+
//TODO remove threadPool after adding non-blocking call in TimelineClient
- private static void shutdownAndAwaitTermination() {
+ private ExecutorService createThreadPool() {
+ return Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+ .build());
+ }
+
+ private void shutdownAndAwaitTermination() {
threadPool.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
- LOG.error("ThreadPool did not terminate");
+ LOG.error("ThreadPool did not terminate");
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
@@ -404,8 +409,7 @@ public class ApplicationMaster {
"No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("debug", false, "Dump out debug information");
- opts.addOption("timeline_service_version", true,
- "Version for timeline service");
+
opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args);
@@ -542,27 +546,15 @@ public class ApplicationMaster {
requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0"));
- if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
- if (cliParser.hasOption("timeline_service_version")) {
- String timelineServiceVersion =
- cliParser.getOptionValue("timeline_service_version", "v1");
- if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) {
- newTimelineService = false;
- } else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) {
- newTimelineService = true;
- } else {
- throw new IllegalArgumentException(
- "timeline_service_version is not set properly, should be 'v1' or 'v2'");
- }
+ if (YarnConfiguration.timelineServiceEnabled(conf)) {
+ timelineServiceV2 =
+ YarnConfiguration.timelineServiceV2Enabled(conf);
+ if (timelineServiceV2) {
+ threadPool = createThreadPool();
}
} else {
timelineClient = null;
LOG.warn("Timeline service is not enabled");
- if (cliParser.hasOption("timeline_service_version")) {
- throw new IllegalArgumentException(
- "Timeline service is not enabled");
- }
}
return true;
@@ -623,16 +615,16 @@ public class ApplicationMaster {
nmClientAsync.start();
startTimelineClient(conf);
- // need to bind timelineClient
- amRMClient.registerTimelineClient(timelineClient);
+ if (timelineServiceV2) {
+ // need to bind timelineClient
+ amRMClient.registerTimelineClient(timelineClient);
+ }
if(timelineClient != null) {
- if (newTimelineService) {
- publishApplicationAttemptEventOnNewTimelineService(timelineClient,
- appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId,
- appSubmitterUgi);
+ if (timelineServiceV2) {
+ publishApplicationAttemptEventOnTimelineServiceV2(
+ DSEvent.DS_APP_ATTEMPT_START);
} else {
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
- DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
+ publishApplicationAttemptEvent(DSEvent.DS_APP_ATTEMPT_START);
}
}
@@ -703,10 +695,9 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+ if (YarnConfiguration.timelineServiceEnabled(conf)) {
// Creating the Timeline Client
- if (newTimelineService) {
+ if (timelineServiceV2) {
timelineClient = TimelineClient.createTimelineClient(
appAttemptID.getApplicationId());
} else {
@@ -742,13 +733,11 @@ public class ApplicationMaster {
}
if (timelineClient != null) {
- if (newTimelineService) {
- publishApplicationAttemptEventOnNewTimelineService(timelineClient,
- appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId,
- appSubmitterUgi);
+ if (timelineServiceV2) {
+ publishApplicationAttemptEventOnTimelineServiceV2(
+ DSEvent.DS_APP_ATTEMPT_END);
} else {
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
- DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
+ publishApplicationAttemptEvent(DSEvent.DS_APP_ATTEMPT_END);
}
}
@@ -855,12 +844,10 @@ public class ApplicationMaster {
+ containerStatus.getContainerId());
}
if(timelineClient != null) {
- if (newTimelineService) {
- publishContainerEndEventOnNewTimelineService(
- timelineClient, containerStatus, domainId, appSubmitterUgi);
+ if (timelineServiceV2) {
+ publishContainerEndEventOnTimelineServiceV2(containerStatus);
} else {
- publishContainerEndEvent(
- timelineClient, containerStatus, domainId, appSubmitterUgi);
+ publishContainerEndEvent(containerStatus);
}
}
}
@@ -981,14 +968,11 @@ public class ApplicationMaster {
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
}
if(applicationMaster.timelineClient != null) {
- if (applicationMaster.newTimelineService) {
- ApplicationMaster.publishContainerStartEventOnNewTimelineService(
- applicationMaster.timelineClient, container,
- applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+ if (applicationMaster.timelineServiceV2) {
+ applicationMaster.publishContainerStartEventOnTimelineServiceV2(
+ container);
} else {
- ApplicationMaster.publishContainerStartEvent(
- applicationMaster.timelineClient, container,
- applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+ applicationMaster.publishContainerStartEvent(container);
}
}
}
@@ -1195,14 +1179,12 @@ public class ApplicationMaster {
}
}
- private static void publishContainerStartEvent(
- final TimelineClient timelineClient, Container container, String domainId,
- UserGroupInformation ugi) {
+ private void publishContainerStartEvent(Container container) {
final TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
entity.setDomainId(domainId);
- entity.addPrimaryFilter("user", ugi.getShortUserName());
+ entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_START.toString());
@@ -1211,12 +1193,13 @@ public class ApplicationMaster {
entity.addEvent(event);
try {
- ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
- @Override
- public TimelinePutResponse run() throws Exception {
- return timelineClient.putEntities(entity);
- }
- });
+ appSubmitterUgi.doAs(
+ new PrivilegedExceptionAction<TimelinePutResponse>() {
+ @Override
+ public TimelinePutResponse run() throws Exception {
+ return timelineClient.putEntities(entity);
+ }
+ });
} catch (Exception e) {
LOG.error("Container start event could not be published for "
+ container.getId().toString(),
@@ -1224,14 +1207,12 @@ public class ApplicationMaster {
}
}
- private static void publishContainerEndEvent(
- final TimelineClient timelineClient, ContainerStatus container,
- String domainId, UserGroupInformation ugi) {
+ private void publishContainerEndEvent(ContainerStatus container) {
final TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getContainerId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
entity.setDomainId(domainId);
- entity.addPrimaryFilter("user", ugi.getShortUserName());
+ entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_END.toString());
@@ -1246,14 +1227,12 @@ public class ApplicationMaster {
}
}
- private static void publishApplicationAttemptEvent(
- final TimelineClient timelineClient, String appAttemptId,
- DSEvent appEvent, String domainId, UserGroupInformation ugi) {
+ private void publishApplicationAttemptEvent(DSEvent appEvent) {
final TimelineEntity entity = new TimelineEntity();
- entity.setEntityId(appAttemptId);
+ entity.setEntityId(appAttemptID.toString());
entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
entity.setDomainId(domainId);
- entity.addPrimaryFilter("user", ugi.getShortUserName());
+ entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
TimelineEvent event = new TimelineEvent();
event.setEventType(appEvent.toString());
event.setTimestamp(System.currentTimeMillis());
@@ -1264,7 +1243,7 @@ public class ApplicationMaster {
LOG.error("App Attempt "
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+ " event could not be published for "
- + appAttemptId.toString(), e);
+ + appAttemptID, e);
}
}
@@ -1296,27 +1275,24 @@ public class ApplicationMaster {
return new Thread(runnableLaunchContainer);
}
- private static void publishContainerStartEventOnNewTimelineService(
- final TimelineClient timelineClient, final Container container,
- final String domainId, final UserGroupInformation ugi) {
+ private void publishContainerStartEventOnTimelineServiceV2(
+ final Container container) {
Runnable publishWrapper = new Runnable() {
public void run() {
- publishContainerStartEventOnNewTimelineServiceBase(timelineClient,
- container, domainId, ugi);
+ publishContainerStartEventOnTimelineServiceV2Base(container);
}
};
threadPool.execute(publishWrapper);
}
- private static void publishContainerStartEventOnNewTimelineServiceBase(
- final TimelineClient timelineClient, Container container, String domainId,
- UserGroupInformation ugi) {
+ private void publishContainerStartEventOnTimelineServiceV2Base(
+ Container container) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
//entity.setDomainId(domainId);
- entity.addInfo("user", ugi.getShortUserName());
+ entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
@@ -1327,7 +1303,7 @@ public class ApplicationMaster {
entity.addEvent(event);
try {
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
@@ -1341,27 +1317,24 @@ public class ApplicationMaster {
}
}
- private static void publishContainerEndEventOnNewTimelineService(
- final TimelineClient timelineClient, final ContainerStatus container,
- final String domainId, final UserGroupInformation ugi) {
+ private void publishContainerEndEventOnTimelineServiceV2(
+ final ContainerStatus container) {
Runnable publishWrapper = new Runnable() {
public void run() {
- publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
- container, domainId, ugi);
+ publishContainerEndEventOnTimelineServiceV2Base(container);
}
};
threadPool.execute(publishWrapper);
}
- private static void publishContainerEndEventOnNewTimelineServiceBase(
- final TimelineClient timelineClient, final ContainerStatus container,
- final String domainId, final UserGroupInformation ugi) {
+ private void publishContainerEndEventOnTimelineServiceV2Base(
+ final ContainerStatus container) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getContainerId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
//entity.setDomainId(domainId);
- entity.addInfo("user", ugi.getShortUserName());
+ entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
@@ -1371,7 +1344,7 @@ public class ApplicationMaster {
entity.addEvent(event);
try {
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
@@ -1385,29 +1358,25 @@ public class ApplicationMaster {
}
}
- private static void publishApplicationAttemptEventOnNewTimelineService(
- final TimelineClient timelineClient, final String appAttemptId,
- final DSEvent appEvent, final String domainId,
- final UserGroupInformation ugi) {
+ private void publishApplicationAttemptEventOnTimelineServiceV2(
+ final DSEvent appEvent) {
Runnable publishWrapper = new Runnable() {
public void run() {
- publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient,
- appAttemptId, appEvent, domainId, ugi);
+ publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent);
}
};
threadPool.execute(publishWrapper);
}
- private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
- final TimelineClient timelineClient, String appAttemptId,
- DSEvent appEvent, String domainId, UserGroupInformation ugi) {
+ private void publishApplicationAttemptEventOnTimelineServiceV2Base(
+ DSEvent appEvent) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
- entity.setId(appAttemptId);
+ entity.setId(appAttemptID.toString());
entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
//entity.setDomainId(domainId);
- entity.addInfo("user", ugi.getShortUserName());
+ entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setId(appEvent.toString());
@@ -1415,7 +1384,7 @@ public class ApplicationMaster {
entity.addEvent(event);
try {
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
@@ -1426,7 +1395,7 @@ public class ApplicationMaster {
LOG.error("App Attempt "
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+ " event could not be published for "
- + appAttemptId.toString(),
+ + appAttemptID,
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 2819c91..e66005e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -192,8 +192,6 @@ public class Client {
// Command line options
private Options opts;
- private String timelineServiceVersion;
-
private static final String shellCommandPath = "shellCommands";
private static final String shellArgsPath = "shellArgs";
private static final String appMasterJarPath = "AppMaster.jar";
@@ -269,7 +267,6 @@ public class Client {
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("log_properties", true, "log4j.properties file");
- opts.addOption("timeline_service_version", true, "Version for timeline service");
opts.addOption("keep_containers_across_application_attempts", false,
"Flag to indicate whether to keep containers across application attempts." +
" If the flag is true, running containers will not be killed when" +
@@ -371,16 +368,6 @@ public class Client {
+ " Specified virtual cores=" + amVCores);
}
- if (cliParser.hasOption("timeline_service_version")) {
- timelineServiceVersion =
- cliParser.getOptionValue("timeline_service_version", "v1");
- if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") ||
- timelineServiceVersion.trim().equalsIgnoreCase("v2"))) {
- throw new IllegalArgumentException(
- "timeline_service_version is not set properly, should be 'v1' or 'v2'");
- }
- }
-
if (!cliParser.hasOption("jar")) {
throw new IllegalArgumentException("No jar file specified for application master");
}
@@ -690,9 +677,6 @@ public class Client {
vargs.add("--debug");
}
- if (timelineServiceVersion != null) {
- vargs.add("--timeline_service_version " + timelineServiceVersion);
- }
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index fe817c3..b3ff9b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -99,22 +99,19 @@ public class TestDistributedShell {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
conf.set("yarn.log.dir", "target");
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
if (!testName.getMethodName().toLowerCase().contains("v2")) {
// disable aux-service based timeline collectors
conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
- conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
- true);
- conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, false);
} else {
+ // set version to 2
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
// enable aux-service based timeline collectors
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "."
+ TIMELINE_AUX_SERVICE_NAME + ".class",
PerNodeTimelineCollectorsAuxService.class.getName());
- conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
- conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
- false);
}
conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
@@ -245,12 +242,7 @@ public class TestDistributedShell {
}
boolean isTestingTimelineV2 = false;
if (timelineVersion.equalsIgnoreCase("v2")) {
- String[] timelineArgs = {
- "--timeline_service_version",
- "v2"
- };
isTestingTimelineV2 = true;
- args = mergeArgs(args, timelineArgs);
if (!defaultFlow) {
String[] flowArgs = {
"--flow_name",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index 7176146..d096a6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -403,7 +403,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
@Override
- public Priority getApplicationPriority() {
+ public synchronized Priority getApplicationPriority() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.appPriority != null) {
return this.appPriority;
@@ -416,7 +416,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
@Override
- public void setApplicationPriority(Priority priority) {
+ public synchronized void setApplicationPriority(Priority priority) {
maybeInitBuilder();
if (priority == null)
builder.clearApplicationPriority();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index f2707ba..9772dc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -48,17 +48,21 @@ public abstract class TimelineClient extends AbstractService {
* current user may use {@link UserGroupInformation#doAs} another user to
* construct and initialize a timeline client if the following operations are
* supposed to be conducted by that user.
- *
- * @return a timeline client
*/
protected ApplicationId contextAppId;
+ /**
+ * Creates an instance of the timeline v.1.x client.
+ */
@Public
public static TimelineClient createTimelineClient() {
TimelineClient client = new TimelineClientImpl();
return client;
}
+ /**
+ * Creates an instance of the timeline v.2 client.
+ */
@Public
public static TimelineClient createTimelineClient(ApplicationId appId) {
TimelineClient client = new TimelineClientImpl(appId);
@@ -156,8 +160,9 @@ public abstract class TimelineClient extends AbstractService {
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
- * aggregator. It is a blocking API. The method will not return until all the
- * put entities have been persisted.
+ * service v.2 collector. It is a blocking API. The method will not return
+ * until all the put entities have been persisted. If this method is invoked
+ * for a non-v.2 timeline client instance, a YarnException is thrown.
* </p>
*
* @param entities
@@ -173,8 +178,9 @@ public abstract class TimelineClient extends AbstractService {
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
- * aggregator. It is an asynchronous API. The method will return once all the
- * entities are received.
+ * service v.2 collector. It is an asynchronous API. The method will return
+ * once all the entities are received. If this method is invoked for a
+ * non-v.2 timeline client instance, a YarnException is thrown.
* </p>
*
* @param entities
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 8312b6d..3a624ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -124,7 +124,7 @@ public class TimelineClientImpl extends TimelineClient {
private int maxServiceRetries;
private long serviceRetryInterval;
- private boolean newTimelineService = false;
+ private boolean timelineServiceV2 = false;
@Private
@VisibleForTesting
@@ -270,7 +270,7 @@ public class TimelineClientImpl extends TimelineClient {
public TimelineClientImpl(ApplicationId applicationId) {
super(TimelineClientImpl.class.getName(), applicationId);
- this.newTimelineService = true;
+ this.timelineServiceV2 = true;
}
protected void serviceInit(Configuration conf) throws Exception {
@@ -299,13 +299,13 @@ public class TimelineClientImpl extends TimelineClient {
new TimelineURLConnectionFactory()), cc);
TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
// TODO need to cleanup filter retry later.
- if (!newTimelineService) {
+ if (!timelineServiceV2) {
client.addFilter(retryFilter);
}
// old version timeline service need to get address from configuration
// while new version need to auto discovery (with retry).
- if (newTimelineService) {
+ if (timelineServiceV2) {
maxServiceRetries = conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
@@ -353,6 +353,9 @@ public class TimelineClientImpl extends TimelineClient {
private void putEntities(boolean async,
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException {
+ if (!timelineServiceV2) {
+ throw new YarnException("v.2 method is invoked on a v.1.x client");
+ }
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
entitiesContainer =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 99f84e6..a9adbbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -771,14 +771,15 @@
<property>
<description>The setting that controls whether yarn system metrics is
published to the Timeline server (version one) or not, by RM.
- This configuration is deprecated.</description>
+ This configuration is now deprecated in favor of
+ yarn.system-metrics-publisher.enabled.</description>
<name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
<value>false</value>
</property>
<property>
<description>The setting that controls whether yarn system metrics is
- published on the Timeline server (version two) or not by RM And NM.</description>
+ published on the Timeline service or not by RM And NM.</description>
<name>yarn.system-metrics-publisher.enabled</name>
<value>false</value>
</property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index fa0cf5c..066abfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -261,10 +261,12 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private void initRegisteredCollectors() {
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList();
- this.registeredCollectors = new HashMap<ApplicationId, String> ();
- for (AppCollectorsMapProto c : list) {
- ApplicationId appId = convertFromProtoFormat(c.getAppId());
- this.registeredCollectors.put(appId, c.getAppCollectorAddr());
+ if (!list.isEmpty()) {
+ this.registeredCollectors = new HashMap<>();
+ for (AppCollectorsMapProto c : list) {
+ ApplicationId appId = convertFromProtoFormat(c.getAppId());
+ this.registeredCollectors.put(appId, c.getAppCollectorAddr());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 2521b9c..151006b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -531,10 +531,12 @@ public class NodeHeartbeatResponsePBImpl extends
private void initAppCollectorsMap() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<AppCollectorsMapProto> list = p.getAppCollectorsMapList();
- this.appCollectorsMap = new HashMap<ApplicationId, String> ();
- for (AppCollectorsMapProto c : list) {
- ApplicationId appId = convertFromProtoFormat(c.getAppId());
- this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
+ if (!list.isEmpty()) {
+ this.appCollectorsMap = new HashMap<>();
+ for (AppCollectorsMapProto c : list) {
+ ApplicationId appId = convertFromProtoFormat(c.getAppId());
+ this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 0b378a1..8fce422 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -64,7 +64,8 @@ public interface Context {
/**
* Get the registered collectors that located on this NM.
- * @return registered
+ * @return registered collectors, or null if the timeline service v.2 is not
+ * enabled
*/
Map<ApplicationId, String> getRegisteredCollectors();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 601bd04..da8a13a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -97,6 +97,7 @@ public class NodeManager extends CompositeService
private Context context;
private AsyncDispatcher dispatcher;
private ContainerManagerImpl containerManager;
+ // the NM collector service is set only if the timeline service v.2 is enabled
private NMCollectorService nmCollectorService;
private NodeStatusUpdater nodeStatusUpdater;
private NodeResourceMonitor nodeResourceMonitor;
@@ -356,8 +357,10 @@ public class NodeManager extends CompositeService
DefaultMetricsSystem.initialize("NodeManager");
- this.nmCollectorService = createNMCollectorService(context);
- addService(nmCollectorService);
+ if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+ this.nmCollectorService = createNMCollectorService(context);
+ addService(nmCollectorService);
+ }
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
@@ -457,8 +460,7 @@ public class NodeManager extends CompositeService
protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
- protected Map<ApplicationId, String> registeredCollectors =
- new ConcurrentHashMap<ApplicationId, String>();
+ protected Map<ApplicationId, String> registeredCollectors;
protected final ConcurrentMap<ContainerId,
org.apache.hadoop.yarn.api.records.Container> increasedContainers =
@@ -484,6 +486,9 @@ public class NodeManager extends CompositeService
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
NMStateStoreService stateStore, Configuration conf) {
+ if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+ this.registeredCollectors = new ConcurrentHashMap<>();
+ }
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.dirsHandler = dirsHandler;
@@ -709,7 +714,14 @@ public class NodeManager extends CompositeService
return this.context;
}
- // For testing
+ /**
+ * Returns the NM collector service. It should be used only for testing
+ * purposes.
+ *
+ * @return the NM collector service, or null if the timeline service v.2 is
+ * not enabled
+ */
+ @VisibleForTesting
NMCollectorService getNMCollectorService() {
return this.nmCollectorService;
}
@@ -717,6 +729,7 @@ public class NodeManager extends CompositeService
public static void main(String[] args) throws IOException {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
+ @SuppressWarnings("resource")
NodeManager nodeManager = new NodeManager();
Configuration conf = new YarnConfiguration();
new GenericOptionsParser(conf, args);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index d39204f..39c846c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -821,7 +821,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
dispatcher.getEventHandler().handle(
new CMgrSignalContainersEvent(containersToSignal));
}
- if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
+ if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
updateTimelineClientsAddress(response);
}
@@ -853,7 +853,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
/**
* Caller should take care of sending non null nodelabels for both
* arguments
- *
+ *
* @param nodeLabelsNew
* @param nodeLabelsOld
* @return if the New node labels are diff from the older one.
@@ -869,27 +869,37 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private void updateTimelineClientsAddress(
NodeHeartbeatResponse response) {
- Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
- response.getAppCollectorsMap().entrySet();
- for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
- ApplicationId appId = entry.getKey();
- String collectorAddr = entry.getValue();
-
- // Only handle applications running on local node.
- // Not include apps with timeline collectors running in local
- Application application = context.getApplications().get(appId);
- if (application != null &&
- !context.getRegisteredCollectors().containsKey(appId)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sync a new collector address: " + collectorAddr +
- " for application: " + appId + " from RM.");
+ Map<ApplicationId, String> knownCollectorsMap =
+ response.getAppCollectorsMap();
+ if (knownCollectorsMap == null) {
+ LOG.warn("the collectors map is null");
+ } else {
+ Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
+ knownCollectorsMap.entrySet();
+ for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
+ ApplicationId appId = entry.getKey();
+ String collectorAddr = entry.getValue();
+
+ // Only handle applications running on local node.
+ // Not include apps with timeline collectors running in local
+ Application application = context.getApplications().get(appId);
+ // TODO this logic could be problematic if the collector address
+ // gets updated due to NM restart or collector service failure
+ if (application != null &&
+ !context.getRegisteredCollectors().containsKey(appId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sync a new collector address: " + collectorAddr +
+ " for application: " + appId + " from RM.");
+ }
+ TimelineClient client = application.getTimelineClient();
+ if (client != null) {
+ client.setTimelineServiceAddress(collectorAddr);
+ }
}
- TimelineClient client = application.getTimelineClient();
- client.setTimelineServiceAddress(collectorAddr);
}
}
}
-
+
private void updateMasterKeys(NodeHeartbeatResponse response) {
// See if the master-key has rolled over
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
index db79ee5..3ba81ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -43,6 +43,10 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+/**
+ * Service that handles collector information. It is used only if the timeline
+ * service v.2 is enabled.
+ */
public class NMCollectorService extends CompositeService implements
CollectorNodemanagerProtocol {
@@ -113,9 +117,9 @@ public class NMCollectorService extends CompositeService implements
String collectorAddr = collector.getCollectorAddr();
newCollectorsMap.put(appId, collectorAddr);
// set registered collector address to TimelineClient.
- if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
- TimelineClient client =
- context.getApplications().get(appId).getTimelineClient();
+ TimelineClient client =
+ context.getApplications().get(appId).getTimelineClient();
+ if (client != null) {
client.setTimelineServiceAddress(collectorAddr);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index b010eee..e4668c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -115,6 +115,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl.FlowContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
@@ -190,7 +191,8 @@ public class ContainerManagerImpl extends CompositeService implements
private long waitForContainersOnShutdownMillis;
- private final NMTimelinePublisher nmMetricsPublisher;
+ // NM metrics publisher is set only if the timeline service v.2 is enabled
+ private NMTimelinePublisher nmMetricsPublisher;
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@@ -218,8 +220,15 @@ public class ContainerManagerImpl extends CompositeService implements
auxiliaryServices.registerServiceListener(this);
addService(auxiliaryServices);
- nmMetricsPublisher = createNMTimelinePublisher(context);
- context.setNMTimelinePublisher(nmMetricsPublisher);
+ // initialize the metrics publisher if the timeline service v.2 is enabled
+ // and the system publisher is enabled
+ Configuration conf = context.getConf();
+ if (YarnConfiguration.timelineServiceV2Enabled(conf) &&
+ YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
+ LOG.info("YARN system metrics publishing service is enabled");
+ nmMetricsPublisher = createNMTimelinePublisher(context);
+ context.setNMTimelinePublisher(nmMetricsPublisher);
+ }
this.containersMonitor =
new ContainersMonitorImpl(exec, dispatcher, this.context);
addService(this.containersMonitor);
@@ -237,7 +246,6 @@ public class ContainerManagerImpl extends CompositeService implements
addService(dispatcher);
-
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@@ -335,7 +343,7 @@ public class ContainerManagerImpl extends CompositeService implements
LOG.info("Recovering application " + appId);
//TODO: Recover flow and flow run ID
ApplicationImpl app = new ApplicationImpl(
- dispatcher, p.getUser(), null, null, 0, appId, creds, context);
+ dispatcher, p.getUser(), appId, creds, context);
context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
}
@@ -941,20 +949,27 @@ public class ContainerManagerImpl extends CompositeService implements
try {
if (!serviceStopped) {
// Create the application
- String flowName = launchContext.getEnvironment().get(
- TimelineUtils.FLOW_NAME_TAG_PREFIX);
- String flowVersion = launchContext.getEnvironment().get(
- TimelineUtils.FLOW_VERSION_TAG_PREFIX);
- String flowRunIdStr = launchContext.getEnvironment().get(
- TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
- long flowRunId = 0L;
- if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
- flowRunId = Long.parseLong(flowRunIdStr);
+ // populate the flow context from the launch context if the timeline
+ // service v.2 is enabled
+ FlowContext flowContext = null;
+ if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+ String flowName = launchContext.getEnvironment().get(
+ TimelineUtils.FLOW_NAME_TAG_PREFIX);
+ String flowVersion = launchContext.getEnvironment().get(
+ TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+ String flowRunIdStr = launchContext.getEnvironment().get(
+ TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+ long flowRunId = 0L;
+ if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+ flowRunId = Long.parseLong(flowRunIdStr);
+ }
+ flowContext =
+ new FlowContext(flowName, flowVersion, flowRunId);
}
if (!context.getApplications().containsKey(applicationID)) {
Application application =
- new ApplicationImpl(dispatcher, user, flowName, flowVersion,
- flowRunId, applicationID, credentials, context);
+ new ApplicationImpl(dispatcher, user, flowContext,
+ applicationID, credentials, context);
if (context.getApplications().putIfAbsent(applicationID,
application) == null) {
LOG.info("Creating a new application reference for app "
@@ -1310,7 +1325,9 @@ public class ContainerManagerImpl extends CompositeService implements
Container c = containers.get(event.getContainerID());
if (c != null) {
c.handle(event);
- nmMetricsPublisher.publishContainerEvent(event);
+ if (nmMetricsPublisher != null) {
+ nmMetricsPublisher.publishContainerEvent(event);
+ }
} else {
LOG.warn("Event " + event + " sent to absent container " +
event.getContainerID());
@@ -1326,7 +1343,9 @@ public class ContainerManagerImpl extends CompositeService implements
event.getApplicationID());
if (app != null) {
app.handle(event);
- nmMetricsPublisher.publishApplicationEvent(event);
+ if (nmMetricsPublisher != null) {
+ nmMetricsPublisher.publishApplicationEvent(event);
+ }
} else {
LOG.warn("Event " + event + " sent to absent application "
+ event.getApplicationID());
@@ -1349,7 +1368,9 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public void handle(LocalizationEvent event) {
origLocalizationEventHandler.handle(event);
- timelinePublisher.publishLocalizationEvent(event);
+ if (timelinePublisher != null) {
+ timelinePublisher.publishLocalizationEvent(event);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 6e87cfd..93c6758 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -67,9 +67,8 @@ public class ApplicationImpl implements Application {
final Dispatcher dispatcher;
final String user;
- final String flowName;
- final String flowVersion;
- final long flowRunId;
+ // flow context is set only if the timeline service v.2 is enabled
+ private FlowContext flowContext;
final ApplicationId appId;
final Credentials credentials;
Map<ApplicationAccessType, String> applicationACLs;
@@ -86,14 +85,16 @@ public class ApplicationImpl implements Application {
Map<ContainerId, Container> containers =
new HashMap<ContainerId, Container>();
- public ApplicationImpl(Dispatcher dispatcher, String user, String flowName,
- String flowVersion, long flowRunId, ApplicationId appId,
- Credentials credentials, Context context) {
+ public ApplicationImpl(Dispatcher dispatcher, String user,
+ ApplicationId appId, Credentials credentials, Context context) {
+ this(dispatcher, user, null, appId, credentials, context);
+ }
+
+ public ApplicationImpl(Dispatcher dispatcher, String user,
+ FlowContext flowContext, ApplicationId appId, Credentials credentials,
+ Context context) {
this.dispatcher = dispatcher;
this.user = user;
- this.flowName = flowName;
- this.flowVersion = flowVersion;
- this.flowRunId = flowRunId;
this.appId = appId;
this.credentials = credentials;
this.aclsManager = context.getApplicationACLsManager();
@@ -103,11 +104,44 @@ public class ApplicationImpl implements Application {
writeLock = lock.writeLock();
stateMachine = stateMachineFactory.make(this);
Configuration conf = context.getConf();
- if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
- createAndStartTimelineClient(conf);
+ if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+ if (flowContext == null) {
+ throw new IllegalArgumentException("flow context cannot be null");
+ }
+ this.flowContext = flowContext;
+ if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
+ createAndStartTimelineClient(conf);
+ }
}
}
-
+
+ /**
+ * Data object that encapsulates the flow context for the application purpose.
+ */
+ public static class FlowContext {
+ private final String flowName;
+ private final String flowVersion;
+ private final long flowRunId;
+
+ public FlowContext(String flowName, String flowVersion, long flowRunId) {
+ this.flowName = flowName;
+ this.flowVersion = flowVersion;
+ this.flowRunId = flowRunId;
+ }
+
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public String getFlowVersion() {
+ return flowVersion;
+ }
+
+ public long getFlowRunId() {
+ return flowRunId;
+ }
+ }
+
private void createAndStartTimelineClient(Configuration conf) {
// create and start timeline client
this.timelineClient = TimelineClient.createTimelineClient(appId);
@@ -454,7 +488,11 @@ public class ApplicationImpl implements Application {
// Remove collectors info for finished apps.
// TODO check we remove related collectors info in failure cases
// (YARN-3038)
- app.context.getRegisteredCollectors().remove(app.getAppId());
+ Map<ApplicationId, String> registeredCollectors =
+ app.context.getRegisteredCollectors();
+ if (registeredCollectors != null) {
+ registeredCollectors.remove(app.getAppId());
+ }
// stop timelineClient when application get finished.
TimelineClient timelineClient = app.getTimelineClient();
if (timelineClient != null) {
@@ -521,16 +559,16 @@ public class ApplicationImpl implements Application {
@Override
public String getFlowName() {
- return flowName;
+ return flowContext == null ? null : flowContext.getFlowName();
}
@Override
public String getFlowVersion() {
- return flowVersion;
+ return flowContext == null ? null : flowContext.getFlowVersion();
}
@Override
public long getFlowRunId() {
- return flowRunId;
+ return flowContext == null ? 0L : flowContext.getFlowRunId();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/45510fc6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index dfa32ac..589cf75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
@@ -559,9 +560,13 @@ public class ContainersMonitorImpl extends AbstractService implements
ContainerImpl container =
(ContainerImpl) context.getContainers().get(containerId);
- container.getNMTimelinePublisher().reportContainerResourceUsage(
- container, currentTime, pId, currentPmemUsage,
- cpuUsageTotalCoresPercentage);
+ NMTimelinePublisher nmMetricsPublisher =
+ container.getNMTimelinePublisher();
+ if (nmMetricsPublisher != null) {
+ nmMetricsPublisher.reportContainerResourceUsage(
+ container, currentTime, pId, currentPmemUsage,
+ cpuUsageTotalCoresPercentage);
+ }
} catch (Exception e) {
// Log the exception and proceed to the next container.
LOG.warn("Uncaught exception in ContainersMonitorImpl "