You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:28 UTC
[11/50] [abbrv] hadoop git commit: YARN-3045. Implement NM writing
container lifecycle events to Timeline Service v2. Contributed by
Naganarasimha G R.
YARN-3045. Implement NM writing container lifecycle events to Timeline Service v2. Contributed by Naganarasimha G R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/acbf140a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/acbf140a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/acbf140a
Branch: refs/heads/feature-YARN-2928
Commit: acbf140a929de268d8adb42696ba1a7fc4b0ed49
Parents: a87a00e
Author: Junping Du <ju...@apache.org>
Authored: Tue Aug 18 04:31:45 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:41:56 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../dev-support/findbugs-exclude.xml | 10 +
.../distributedshell/TestDistributedShell.java | 28 +-
.../hadoop/yarn/server/nodemanager/Context.java | 5 +
.../yarn/server/nodemanager/NodeManager.java | 13 +
.../containermanager/ContainerManagerImpl.java | 46 ++-
.../ApplicationContainerFinishedEvent.java | 17 +-
.../containermanager/container/Container.java | 3 +
.../container/ContainerImpl.java | 30 +-
.../monitor/ContainersMonitorImpl.java | 107 +-----
.../timelineservice/NMTimelineEvent.java | 31 ++
.../timelineservice/NMTimelineEventType.java | 24 ++
.../timelineservice/NMTimelinePublisher.java | 376 +++++++++++++++++++
.../nodemanager/TestNodeStatusUpdater.java | 24 +-
.../amrmproxy/BaseAMRMProxyTest.java | 9 +
.../containermanager/TestAuxServices.java | 4 +-
.../TestContainerManagerRecovery.java | 8 +
.../application/TestApplication.java | 7 +-
.../container/TestContainer.java | 2 +-
.../nodemanager/webapp/MockContainer.java | 6 +
.../nodemanager/webapp/TestNMWebServer.java | 3 +-
.../PerNodeTimelineCollectorsAuxService.java | 16 +-
...TestPerNodeTimelineCollectorsAuxService.java | 9 +
23 files changed, 646 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 42da97b..a36c288 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -91,6 +91,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3904. Refactor timelineservice.storage to add support to online and
offline aggregation writers (Li Lu via sjlee)
+ YARN-3045. Implement NM writing container lifecycle events to Timeline
+ Service v2. (Naganarasimha G R via junping_du)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index d36f245..68ed723 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -115,6 +115,16 @@
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
+ <!-- Object cast is based on the event type -->
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher" />
+ <Bug pattern="BC_UNCONFIRMED_CAST" />
+ </Match>
+
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher$ApplicationEventHandler" />
+ <Bug pattern="BC_UNCONFIRMED_CAST" />
+ </Match>
<!-- Ignore intentional switch fallthroughs -->
<Match>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 8eac50f..fe817c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
@@ -211,7 +212,7 @@ public class TestDistributedShell {
testDSShell(false, "v2", false);
}
- public void testDSShell(boolean haveDomain, String timelineVersion,
+ private void testDSShell(boolean haveDomain, String timelineVersion,
boolean defaultFlow)
throws Exception {
String[] args = {
@@ -412,9 +413,32 @@ public class TestDistributedShell {
"container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_01_000001"
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- verifyEntityTypeFileExists(basePath,
+ File containerEntityFile = verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_CONTAINER.toString(),
containerMetricsTimestampFileName);
+ Assert.assertEquals(
+ "Container created event needs to be published atleast once",
+ 1,
+ getNumOfStringOccurences(containerEntityFile,
+ ContainerMetricsConstants.CREATED_EVENT_TYPE));
+
+ // to avoid race condition of testcase, atleast check 4 times with sleep
+ // of 500ms
+ long numOfContainerFinishedOccurences = 0;
+ for (int i = 0; i < 4; i++) {
+ numOfContainerFinishedOccurences =
+ getNumOfStringOccurences(containerEntityFile,
+ ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+ if (numOfContainerFinishedOccurences > 0) {
+ break;
+ } else {
+ Thread.sleep(500l);
+ }
+ }
+ Assert.assertEquals(
+ "Container finished event needs to be published atleast once",
+ 1,
+ numOfContainerFinishedOccurences);
// Verify RM posting Application life cycle Events are getting published
String appMetricsTimestampFileName =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 924860b..0b378a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
/**
@@ -96,4 +97,8 @@ public interface Context {
ConcurrentLinkedQueue<LogAggregationReport>
getLogAggregationStatusForApps();
+
+ void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher);
+
+ NMTimelinePublisher getNMTimelinePublisher();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index e871d73..57e649e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreServic
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -474,6 +475,8 @@ public class NodeManager extends CompositeService
private final ConcurrentLinkedQueue<LogAggregationReport>
logAggregationReportForApps;
+ private NMTimelinePublisher nmTimelinePublisher;
+
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@@ -616,6 +619,16 @@ public class NodeManager extends CompositeService
Map<ApplicationId, String> newRegisteredCollectors) {
this.registeredCollectors.putAll(newRegisteredCollectors);
}
+
+ @Override
+ public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) {
+ this.nmTimelinePublisher = nmMetricsPublisher;
+ }
+
+ @Override
+ public NMTimelinePublisher getNMTimelinePublisher() {
+ return nmTimelinePublisher;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index da79446..12aab79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
@@ -142,6 +143,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -188,6 +190,8 @@ public class ContainerManagerImpl extends CompositeService implements
private long waitForContainersOnShutdownMillis;
+ private final NMTimelinePublisher nmMetricsPublisher;
+
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
@@ -214,6 +218,8 @@ public class ContainerManagerImpl extends CompositeService implements
auxiliaryServices.registerServiceListener(this);
addService(auxiliaryServices);
+ nmMetricsPublisher = createNMTimelinePublisher(context);
+ context.setNMTimelinePublisher(nmMetricsPublisher);
this.containersMonitor =
new ContainersMonitorImpl(exec, dispatcher, this.context);
addService(this.containersMonitor);
@@ -222,13 +228,16 @@ public class ContainerManagerImpl extends CompositeService implements
new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class,
new ApplicationEventDispatcher());
- dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
+ dispatcher.register(LocalizationEventType.class,
+ new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc,
+ nmMetricsPublisher));
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
addService(dispatcher);
+
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@@ -351,7 +360,7 @@ public class ContainerManagerImpl extends CompositeService implements
Container container = new ContainerImpl(getConfig(), dispatcher,
context.getNMStateStore(), req.getContainerLaunchContext(),
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
- rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability());
+ rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), context);
context.getContainers().put(containerId, container);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
@@ -413,6 +422,13 @@ public class ContainerManagerImpl extends CompositeService implements
return new SharedCacheUploadService();
}
+ @VisibleForTesting
+ protected NMTimelinePublisher createNMTimelinePublisher(Context context) {
+ NMTimelinePublisher nmTimelinePublisherLocal = new NMTimelinePublisher(context);
+ addIfService(nmTimelinePublisherLocal);
+ return nmTimelinePublisherLocal;
+ }
+
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
@@ -910,7 +926,7 @@ public class ContainerManagerImpl extends CompositeService implements
Container container =
new ContainerImpl(getConfig(), this.dispatcher,
context.getNMStateStore(), launchContext,
- credentials, metrics, containerTokenIdentifier);
+ credentials, metrics, containerTokenIdentifier, context);
ApplicationId applicationID =
containerId.getApplicationAttemptId().getApplicationId();
if (context.getContainers().putIfAbsent(containerId, container) != null) {
@@ -952,9 +968,9 @@ public class ContainerManagerImpl extends CompositeService implements
logAggregationContext));
}
- this.context.getNMStateStore().storeContainer(containerId, request);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
+ this.context.getNMStateStore().storeContainer(containerId, request);
this.context.getContainerTokenSecretManager().startContainerSuccessful(
containerTokenIdentifier);
@@ -1291,6 +1307,7 @@ public class ContainerManagerImpl extends CompositeService implements
Container c = containers.get(event.getContainerID());
if (c != null) {
c.handle(event);
+ nmMetricsPublisher.publishContainerEvent(event);
} else {
LOG.warn("Event " + event + " sent to absent container " +
event.getContainerID());
@@ -1299,7 +1316,6 @@ public class ContainerManagerImpl extends CompositeService implements
}
class ApplicationEventDispatcher implements EventHandler<ApplicationEvent> {
-
@Override
public void handle(ApplicationEvent event) {
Application app =
@@ -1307,6 +1323,7 @@ public class ContainerManagerImpl extends CompositeService implements
event.getApplicationID());
if (app != null) {
app.handle(event);
+ nmMetricsPublisher.publishApplicationEvent(event);
} else {
LOG.warn("Event " + event + " sent to absent application "
+ event.getApplicationID());
@@ -1314,6 +1331,25 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
+ private static final class LocalizationEventHandlerWrapper implements
+ EventHandler<LocalizationEvent> {
+
+ private EventHandler<LocalizationEvent> origLocalizationEventHandler;
+ private NMTimelinePublisher timelinePublisher;
+
+ LocalizationEventHandlerWrapper(EventHandler<LocalizationEvent> handler,
+ NMTimelinePublisher publisher) {
+ this.origLocalizationEventHandler = handler;
+ this.timelinePublisher = publisher;
+ }
+
+ @Override
+ public void handle(LocalizationEvent event) {
+ origLocalizationEventHandler.handle(event);
+ timelinePublisher.publishLocalizationEvent(event);
+ }
+ }
+
@SuppressWarnings("unchecked")
@Override
public void handle(ContainerManagerEvent event) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
index 6b8007f..9cd34cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
@@ -19,18 +19,23 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
public class ApplicationContainerFinishedEvent extends ApplicationEvent {
- private ContainerId containerID;
+ private ContainerStatus containerStatus;
- public ApplicationContainerFinishedEvent(
- ContainerId containerID) {
- super(containerID.getApplicationAttemptId().getApplicationId(),
+ public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) {
+ super(containerStatus.getContainerId().getApplicationAttemptId().getApplicationId(),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED);
- this.containerID = containerID;
+ this.containerStatus = containerStatus;
}
public ContainerId getContainerID() {
- return this.containerID;
+ return containerStatus.getContainerId();
}
+
+ public ContainerStatus getContainerStatus() {
+ return containerStatus;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 1d2ec56..aac88fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -57,4 +58,6 @@ public interface Container extends EventHandler<ContainerEvent> {
String toString();
+ Priority getPriority();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 3c49489..0640350 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -44,12 +44,14 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@@ -70,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -125,11 +128,12 @@ public class ContainerImpl implements Container {
RecoveredContainerStatus.REQUESTED;
// whether container was marked as killed after recovery
private boolean recoveredAsKilled = false;
+ private Context context;
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
NMStateStoreService stateStore, ContainerLaunchContext launchContext,
Credentials creds, NodeManagerMetrics metrics,
- ContainerTokenIdentifier containerTokenIdentifier) {
+ ContainerTokenIdentifier containerTokenIdentifier, Context context) {
this.daemonConf = conf;
this.dispatcher = dispatcher;
this.stateStore = stateStore;
@@ -144,8 +148,8 @@ public class ContainerImpl implements Container {
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
-
stateMachine = stateMachineFactory.make(this);
+ this.context = context;
}
// constructor for a recovered container
@@ -154,9 +158,10 @@ public class ContainerImpl implements Container {
Credentials creds, NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier,
RecoveredContainerStatus recoveredStatus, int exitCode,
- String diagnostics, boolean wasKilled, Resource recoveredCapability) {
+ String diagnostics, boolean wasKilled, Resource recoveredCapability,
+ Context context) {
this(conf, dispatcher, stateStore, launchContext, creds, metrics,
- containerTokenIdentifier);
+ containerTokenIdentifier, context);
this.recoveredStatus = recoveredStatus;
this.exitCode = exitCode;
this.recoveredAsKilled = wasKilled;
@@ -372,6 +377,10 @@ public class ContainerImpl implements Container {
}
}
+ public NMTimelinePublisher getNMTimelinePublisher() {
+ return context.getNMTimelinePublisher();
+ }
+
@Override
public String getUser() {
this.readLock.lock();
@@ -483,7 +492,10 @@ public class ContainerImpl implements Container {
// Inform the application
@SuppressWarnings("rawtypes")
EventHandler eventHandler = dispatcher.getEventHandler();
- eventHandler.handle(new ApplicationContainerFinishedEvent(containerId));
+
+ ContainerStatus containerStatus = cloneAndGetContainerStatus();
+ eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus));
+
// Remove the container from the resource-monitor
eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
// Tell the logService too
@@ -985,7 +997,8 @@ public class ContainerImpl implements Container {
public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.releaseContainer(container.resource);
container.sendFinishedEvents();
- //if the current state is NEW it means the CONTAINER_INIT was never
+
+ // if the current state is NEW it means the CONTAINER_INIT was never
// sent for the event, thus no need to send the CONTAINER_STOP
if (container.getCurrentState()
!= org.apache.hadoop.yarn.api.records.ContainerState.NEW) {
@@ -1176,4 +1189,9 @@ public class ContainerImpl implements Container {
LocalResourceRequest resource) {
return container.resourcesUploadPolicies.get(resource);
}
+
+ @Override
+ public Priority getPriority() {
+ return containerTokenIdentifier.getPriority();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 51b80bd..3b6b6c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -18,13 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
-import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
@@ -36,10 +32,6 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -48,13 +40,13 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ContainersMonitorImpl extends AbstractService implements
ContainersMonitor {
@@ -86,17 +78,11 @@ public class ContainersMonitorImpl extends AbstractService implements
private boolean pmemCheckEnabled;
private boolean vmemCheckEnabled;
private boolean containersMonitorEnabled;
-
- private boolean publishContainerMetricsToTimelineService;
private long maxVCoresAllottedForContainers;
private static final long UNKNOWN_MEMORY_LIMIT = -1L;
private int nodeCpuPercentageForYARN;
-
- // For posting entities in new timeline service in a non-blocking way
- // TODO replace with event loop in TimelineClient.
- private static ExecutorService threadPool;
@Private
public static enum ContainerMetric {
@@ -210,22 +196,6 @@ public class ContainersMonitorImpl extends AbstractService implements
1) + "). Thrashing might happen.");
}
}
-
- publishContainerMetricsToTimelineService =
- YarnConfiguration.systemMetricsPublisherEnabled(conf);
-
- if (publishContainerMetricsToTimelineService) {
- LOG.info("NodeManager has been configured to publish container " +
- "metrics to Timeline Service V2.");
- threadPool =
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
- .build());
- } else {
- LOG.warn("NodeManager has not been configured to publish container " +
- "metrics to Timeline Service V2.");
- }
-
super.serviceInit(conf);
}
@@ -269,29 +239,8 @@ public class ContainersMonitorImpl extends AbstractService implements
}
}
- shutdownAndAwaitTermination();
-
super.serviceStop();
}
-
- // TODO remove threadPool after adding non-blocking call in TimelineClient
- private static void shutdownAndAwaitTermination() {
- if (threadPool == null) {
- return;
- }
- threadPool.shutdown();
- try {
- if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
- threadPool.shutdownNow();
- if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
- LOG.error("ThreadPool did not terminate");
- }
- } catch (InterruptedException ie) {
- threadPool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
@VisibleForTesting
static class ProcessTreeInfo {
@@ -470,9 +419,6 @@ public class ContainersMonitorImpl extends AbstractService implements
ContainerId containerId = entry.getKey();
ProcessTreeInfo ptInfo = entry.getValue();
- ContainerEntity entity = new ContainerEntity();
- entity.setId(containerId.toString());
-
try {
String pId = ptInfo.getPID();
@@ -556,26 +502,6 @@ public class ContainersMonitorImpl extends AbstractService implements
containerMetricsUnregisterDelayMs).recordCpuUsage
((int)cpuUsagePercentPerCore, milliVcoresUsed);
}
-
- if (publishContainerMetricsToTimelineService) {
- // if currentPmemUsage data is available
- if (currentPmemUsage !=
- ResourceCalculatorProcessTree.UNAVAILABLE) {
- TimelineMetric memoryMetric = new TimelineMetric();
- memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId);
- memoryMetric.addValue(currentTime, currentPmemUsage);
- entity.addMetric(memoryMetric);
- }
- // if cpuUsageTotalCoresPercentage data is available
- if (cpuUsageTotalCoresPercentage !=
- ResourceCalculatorProcessTree.UNAVAILABLE) {
- TimelineMetric cpuMetric = new TimelineMetric();
- cpuMetric.setId(ContainerMetric.CPU.toString() + pId);
- cpuMetric.addValue(currentTime,
- cpuUsageTotalCoresPercentage);
- entity.addMetric(cpuMetric);
- }
- }
boolean isMemoryOverLimit = false;
String msg = "";
@@ -632,23 +558,16 @@ public class ContainersMonitorImpl extends AbstractService implements
LOG.info("Removed ProcessTree with root " + pId);
}
+ ContainerImpl container =
+ (ContainerImpl) context.getContainers().get(containerId);
+ container.getNMTimelinePublisher().reportContainerResourceUsage(
+ container, currentTime, pId, currentPmemUsage,
+ cpuUsageTotalCoresPercentage);
} catch (Exception e) {
// Log the exception and proceed to the next container.
LOG.warn("Uncaught exception in ContainersMonitorImpl "
+ "while monitoring resource of " + containerId, e);
}
-
- if (publishContainerMetricsToTimelineService) {
- try {
- TimelineClient timelineClient = context.getApplications().get(
- containerId.getApplicationAttemptId().getApplicationId()).
- getTimelineClient();
- putEntityWithoutBlocking(timelineClient, entity);
- } catch (Exception e) {
- LOG.error("Exception in ContainersMonitorImpl in putting " +
- "resource usage metrics to timeline service.", e);
- }
- }
}
if (LOG.isDebugEnabled()) {
LOG.debug("Total Resource Usage stats in NM by all containers : "
@@ -671,20 +590,6 @@ public class ContainersMonitorImpl extends AbstractService implements
}
}
}
-
- private void putEntityWithoutBlocking(final TimelineClient timelineClient,
- final TimelineEntity entity) {
- Runnable publishWrapper = new Runnable() {
- public void run() {
- try {
- timelineClient.putEntities(entity);
- } catch (IOException|YarnException e) {
- LOG.error("putEntityNonBlocking get failed: " + e);
- }
- }
- };
- threadPool.execute(publishWrapper);
- }
private String formatErrorMessage(String memTypeExceeded,
long currentVmemUsage, long vmemLimit,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
new file mode 100644
index 0000000..af8d94c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> {
+ public NMTimelineEvent(NMTimelineEventType type) {
+ super(type);
+ }
+
+ public NMTimelineEvent(NMTimelineEventType type, long timestamp) {
+ super(type, timestamp);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
new file mode 100644
index 0000000..c1129af
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+public enum NMTimelineEventType {
+ // Publish the NM Timeline entity
+ TIMELINE_ENTITY_PUBLISH,
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
new file mode 100644
index 0000000..2c5c300
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+public class NMTimelinePublisher extends CompositeService {
+
+ private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class);
+
+ private Dispatcher dispatcher;
+ private boolean publishSystemMetrics;
+
+ private Context context;
+
+ private NodeId nodeId;
+
+ private String httpAddress;
+
+ public NMTimelinePublisher(Context context) {
+ super(NMTimelinePublisher.class.getName());
+ this.context = context;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ publishSystemMetrics =
+ YarnConfiguration.systemMetricsPublisherEnabled(conf);
+
+ if (publishSystemMetrics) {
+ dispatcher = new AsyncDispatcher();
+ dispatcher.register(NMTimelineEventType.class,
+ new ForwardingEventHandler());
+ dispatcher
+ .register(ContainerEventType.class, new ContainerEventHandler());
+ dispatcher.register(ApplicationEventType.class,
+ new ApplicationEventHandler());
+ dispatcher.register(LocalizationEventType.class,
+ new LocalizationEventDispatcher());
+ addIfService(dispatcher);
+ LOG.info("YARN system metrics publishing service is enabled");
+ } else {
+ LOG.info("YARN system metrics publishing service is not enabled");
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ // context will be updated after containerManagerImpl is started
+ // hence NMMetricsPublisher is added subservice of containerManagerImpl
+ this.nodeId = context.getNodeId();
+ this.httpAddress = nodeId.getHost() + ":" + context.getHttpPort();
+ }
+
+ protected void handleNMTimelineEvent(NMTimelineEvent event) {
+ switch (event.getType()) {
+ case TIMELINE_ENTITY_PUBLISH:
+ putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(),
+ ((TimelinePublishEvent) event).getApplicationId());
+ break;
+ default:
+ LOG.error("Unknown NMTimelineEvent type: " + event.getType());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void reportContainerResourceUsage(Container container,
+ long createdTime, String pId, Long pmemUsage,
+ Float cpuUsageTotalCoresPercentage) {
+ if (publishSystemMetrics
+ && (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE)) {
+ ContainerEntity entity =
+ createContainerEntity(container.getContainerId());
+ long currentTimeMillis = System.currentTimeMillis();
+ if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
+ TimelineMetric memoryMetric = new TimelineMetric();
+ memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId);
+ memoryMetric.addValue(currentTimeMillis, pmemUsage);
+ entity.addMetric(memoryMetric);
+ }
+ if (cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE) {
+ TimelineMetric cpuMetric = new TimelineMetric();
+ cpuMetric.setId(ContainerMetric.CPU.toString() + pId);
+ cpuMetric.addValue(currentTimeMillis, cpuUsageTotalCoresPercentage);
+ entity.addMetric(cpuMetric);
+ }
+ dispatcher.getEventHandler().handle(
+ new TimelinePublishEvent(entity, container.getContainerId()
+ .getApplicationAttemptId().getApplicationId()));
+ }
+ }
+
+ private void publishContainerCreatedEvent(ContainerEntity entity,
+ ContainerId containerId, Resource resource, Priority priority,
+ long timestamp) {
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
+ resource.getMemory());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
+ resource.getVirtualCores());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
+ nodeId.getHost());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
+ nodeId.getPort());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+ priority.toString());
+ entityInfo.put(
+ ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+ httpAddress);
+ entity.setInfo(entityInfo);
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
+ tEvent.setTimestamp(timestamp);
+
+ entity.addEvent(tEvent);
+ putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
+ }
+
+ private void publishContainerFinishedEvent(ContainerStatus containerStatus,
+ long timeStamp) {
+ ContainerId containerId = containerStatus.getContainerId();
+ TimelineEntity entity = createContainerEntity(containerId);
+
+ Map<String, Object> eventInfo = new HashMap<String, Object>();
+ eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+ containerStatus.getDiagnostics());
+ eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+ containerStatus.getExitStatus());
+ eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, containerStatus
+ .getState().toString());
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+ tEvent.setTimestamp(timeStamp);
+ tEvent.setInfo(eventInfo);
+
+ entity.addEvent(tEvent);
+ putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
+ }
+
+ private static ContainerEntity createContainerEntity(ContainerId containerId) {
+ ContainerEntity entity = new ContainerEntity();
+ entity.setId(containerId.toString());
+ Identifier parentIdentifier = new Identifier();
+ parentIdentifier
+ .setType(TimelineEntityType.YARN_APPLICATION_ATTEMPT.name());
+ parentIdentifier.setId(containerId.getApplicationAttemptId().toString());
+ entity.setParent(parentIdentifier);
+ return entity;
+ }
+
+ private void putEntity(TimelineEntity entity, ApplicationId appId) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+ + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ }
+ TimelineClient timelineClient =
+ context.getApplications().get(appId).getTimelineClient();
+ timelineClient.putEntities(entity);
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity " + entity, e);
+ }
+ }
+
+ public void publishApplicationEvent(ApplicationEvent event) {
+ if (!publishSystemMetrics) {
+ return;
+ }
+ // publish only when the desired event is received
+ switch (event.getType()) {
+ case INIT_APPLICATION:
+ case FINISH_APPLICATION:
+ case APPLICATION_CONTAINER_FINISHED:
+ case APPLICATION_LOG_HANDLING_FAILED:
+ dispatcher.getEventHandler().handle(event);
+ break;
+
+ default:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(event.getType()
+ + " is not a desired ApplicationEvent which needs to be published by"
+ + " NMTimelinePublisher");
+ }
+ break;
+ }
+ }
+
+ public void publishContainerEvent(ContainerEvent event) {
+ if (!publishSystemMetrics) {
+ return;
+ }
+ // publish only when the desired event is received
+ switch (event.getType()) {
+ case INIT_CONTAINER:
+ dispatcher.getEventHandler().handle(event);
+ break;
+
+ default:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(event.getType()
+ + " is not a desired ContainerEvent which needs to be published by"
+ + " NMTimelinePublisher");
+ }
+ break;
+ }
+ }
+
+ public void publishLocalizationEvent(LocalizationEvent event) {
+ if (!publishSystemMetrics) {
+ return;
+ }
+ // publish only when the desired event is received
+ switch (event.getType()) {
+ case CONTAINER_RESOURCES_LOCALIZED:
+ case INIT_CONTAINER_RESOURCES:
+ dispatcher.getEventHandler().handle(event);
+ break;
+
+ default:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(event.getType()
+ + " is not a desired LocalizationEvent which needs to be published"
+ + " by NMTimelinePublisher");
+ }
+ break;
+ }
+ }
+
+ private class ApplicationEventHandler implements
+ EventHandler<ApplicationEvent> {
+ @Override
+ public void handle(ApplicationEvent event) {
+ switch (event.getType()) {
+ case APPLICATION_CONTAINER_FINISHED:
+ // this is actually used to publish the container Event
+ ApplicationContainerFinishedEvent evnt =
+ (ApplicationContainerFinishedEvent) event;
+ publishContainerFinishedEvent(evnt.getContainerStatus(),
+ event.getTimestamp());
+ break;
+ default:
+ LOG.error("Seems like event type is captured only in "
+ + "publishApplicationEvent method and not handled here");
+ break;
+ }
+ }
+ }
+
+ private class ContainerEventHandler implements EventHandler<ContainerEvent> {
+ @Override
+ public void handle(ContainerEvent event) {
+ ContainerId containerId = event.getContainerID();
+ Container container = context.getContainers().get(containerId);
+ long timestamp = event.getTimestamp();
+ ContainerEntity entity = createContainerEntity(containerId);
+
+ switch (event.getType()) {
+ case INIT_CONTAINER:
+ publishContainerCreatedEvent(entity, containerId,
+ container.getResource(), container.getPriority(), timestamp);
+ break;
+ default:
+ LOG.error("Seems like event type is captured only in "
+ + "publishContainerEvent method and not handled here");
+ break;
+ }
+ }
+ }
+
+ private static final class LocalizationEventDispatcher implements
+ EventHandler<LocalizationEvent> {
+ @Override
+ public void handle(LocalizationEvent event) {
+ switch (event.getType()) {
+ case INIT_CONTAINER_RESOURCES:
+ case CONTAINER_RESOURCES_LOCALIZED:
+ // TODO after priority based flush jira is finished
+ break;
+ default:
+ LOG.error("Seems like event type is captured only in "
+ + "publishLocalizationEvent method and not handled here");
+ break;
+ }
+ }
+ }
+
+ /**
+ * EventHandler implementation which forward events to NMMetricsPublisher.
+ * Making use of it, NMMetricsPublisher can avoid to have a public handle
+ * method.
+ */
+ private final class ForwardingEventHandler implements
+ EventHandler<NMTimelineEvent> {
+
+ @Override
+ public void handle(NMTimelineEvent event) {
+ handleNMTimelineEvent(event);
+ }
+ }
+
+ private static class TimelinePublishEvent extends NMTimelineEvent {
+ private ApplicationId appId;
+ private TimelineEntity entityToPublish;
+
+ public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) {
+ super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System
+ .currentTimeMillis());
+ this.appId = appId;
+ this.entityToPublish = entity;
+ }
+
+ public ApplicationId getApplicationId() {
+ return appId;
+ }
+
+ public TimelineEntity getTimelineEntityToPublish() {
+ return entityToPublish;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index dd76f60..009fd9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerContext;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -252,8 +253,10 @@ public class TestNodeStatusUpdater {
firstContainerID, InetAddress.getByName("localhost")
.getCanonicalHostName(), 1234, user, resource,
currentTime + 10000, 123, "password".getBytes(), currentTime));
- Container container = new ContainerImpl(conf, mockDispatcher,
- stateStore, launchContext, null, mockMetrics, containerToken);
+ Container container =
+ new ContainerImpl(conf, mockDispatcher, stateStore, launchContext,
+ null, mockMetrics, containerToken,
+ mock(Context.class));
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
@@ -291,8 +294,10 @@ public class TestNodeStatusUpdater {
secondContainerID, InetAddress.getByName("localhost")
.getCanonicalHostName(), 1234, user, resource,
currentTime + 10000, 123, "password".getBytes(), currentTime));
- Container container = new ContainerImpl(conf, mockDispatcher,
- stateStore, launchContext, null, mockMetrics, containerToken);
+ Container container =
+ new ContainerImpl(conf, mockDispatcher, stateStore, launchContext,
+ null, mockMetrics, containerToken,
+ mock(Context.class));
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
@@ -1008,7 +1013,8 @@ public class TestNodeStatusUpdater {
"password".getBytes(), 0);
Container anyCompletedContainer = new ContainerImpl(conf, null,
null, null, null, null,
- BuilderUtils.newContainerTokenIdentifier(containerToken)) {
+ BuilderUtils.newContainerTokenIdentifier(containerToken),
+ mock(Context.class)) {
@Override
public ContainerState getCurrentState() {
@@ -1029,7 +1035,7 @@ public class TestNodeStatusUpdater {
"password".getBytes(), 0);
Container runningContainer =
new ContainerImpl(conf, null, null, null, null, null,
- BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) {
+ BuilderUtils.newContainerTokenIdentifier(runningContainerToken), mock(Context.class)) {
@Override
public ContainerState getCurrentState() {
return ContainerState.RUNNING;
@@ -1085,9 +1091,10 @@ public class TestNodeStatusUpdater {
BuilderUtils.newContainerToken(containerId, "host", 1234, "user",
BuilderUtils.newResource(1024, 1), 0, 123,
"password".getBytes(), 0);
+
Container completedContainer = new ContainerImpl(conf, null,
null, null, null, null,
- BuilderUtils.newContainerTokenIdentifier(containerToken)) {
+ BuilderUtils.newContainerTokenIdentifier(containerToken),mock(Context.class)) {
@Override
public ContainerState getCurrentState() {
return ContainerState.COMPLETE;
@@ -1124,7 +1131,8 @@ public class TestNodeStatusUpdater {
"password".getBytes(), 0);
Container anyCompletedContainer = new ContainerImpl(conf, null,
null, null, null, null,
- BuilderUtils.newContainerTokenIdentifier(containerToken)) {
+ BuilderUtils.newContainerTokenIdentifier(containerToken),
+ mock(Context.class)) {
@Override
public ContainerState getCurrentState() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index bf4fd52..35f2d39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
@@ -688,5 +689,13 @@ public abstract class BaseAMRMProxyTest {
return null;
}
+ @Override
+ public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) {
+ }
+
+ @Override
+ public NMTimelinePublisher getNMTimelinePublisher() {
+ return null;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
index 1380752..8c4bd25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
@@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.junit.Assert;
@@ -193,7 +195,7 @@ public class TestAuxServices {
ContainerId.newContainerId(attemptId, 1), "", "",
Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0);
Container container = new ContainerImpl(null, null, null, null, null,
- null, cti);
+ null, cti, mock(Context.class));
ContainerId containerId = container.getContainerId();
Resource resource = container.getResource();
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 31e2105..f29b791 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -101,10 +101,12 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreServic
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
public class TestContainerManagerRecovery extends BaseContainerManagerTest {
@@ -634,6 +636,12 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
boolean blockNewContainerRequests) {
// do nothing
}
+
+ @Override
+ public NMTimelinePublisher createNMTimelinePublisher(Context context) {
+ NMTimelinePublisher timelinePublisher = mock(NMTimelinePublisher.class);
+ return timelinePublisher;
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
index 002d4cf..38b3172f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
@@ -38,7 +38,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -579,7 +582,7 @@ public class TestApplication {
public void containerFinished(int containerNum) {
app.handle(new ApplicationContainerFinishedEvent(containers.get(
- containerNum).getContainerId()));
+ containerNum).cloneAndGetContainerStatus()));
drainDispatcherEvents();
}
@@ -616,6 +619,8 @@ public class TestApplication {
when(c.getLaunchContext()).thenReturn(launchContext);
when(launchContext.getApplicationACLs()).thenReturn(
new HashMap<ApplicationAccessType, String>());
+ when(c.cloneAndGetContainerStatus()).thenReturn(BuilderUtils.newContainerStatus(cId,
+ ContainerState.NEW, "", 0, Resource.newInstance(1024, 1)));
return c;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 2ab9842..8a21ed2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -865,7 +865,7 @@ public class TestContainer {
when(ctxt.getServiceData()).thenReturn(serviceData);
c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(),
- ctxt, null, metrics, identifier);
+ ctxt, null, metrics, identifier, mock(Context.class));
dispatcher.register(ContainerEventType.class,
new EventHandler<ContainerEvent>() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 394a92c..37c726f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -144,4 +145,9 @@ public class MockContainer implements Container {
public NMContainerStatus getNMContainerStatus() {
return null;
}
+
+ @Override
+ public Priority getPriority() {
+ return Priority.UNDEFINED;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
index ed94fb6..e839f47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
@@ -220,7 +220,8 @@ public class TestNMWebServer {
Container container =
new ContainerImpl(conf, dispatcher, stateStore, launchContext,
null, metrics,
- BuilderUtils.newContainerTokenIdentifier(containerToken)) {
+ BuilderUtils.newContainerTokenIdentifier(containerToken),
+ mock(Context.class)) {
@Override
public ContainerState getContainerState() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
index befaa83..4147d42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
@@ -137,9 +137,19 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
// intercept the event of the AM container being stopped and remove the app
// level collector service
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
- ApplicationId appId = context.getContainerId().
- getApplicationAttemptId().getApplicationId();
- removeApplication(appId);
+ final ApplicationId appId =
+ context.getContainerId().getApplicationAttemptId().getApplicationId();
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ // TODO Temporary Fix until solution for YARN-3995 is finalized.
+ Thread.sleep(1000l);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ removeApplication(appId);
+ }
+ }).start();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/acbf140a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
index 7cc612d..dafc76e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
@@ -98,6 +98,15 @@ public class TestPerNodeTimelineCollectorsAuxService {
when(context.getContainerType()).thenReturn(
ContainerType.APPLICATION_MASTER);
auxService.stopContainer(context);
+
+ // TODO Temporary Fix until solution for YARN-3995 is finalized
+ for (int i = 0; i < 4; i++) {
+ Thread.sleep(500l);
+ if (!auxService.hasApplication(appAttemptId.getApplicationId())) {
+ break;
+ }
+ }
+
// auxService should not have that app
assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
auxService.close();