You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2019/04/05 19:02:52 UTC
[hadoop] branch trunk updated: YARN-9382 Publish container killed,
paused and resumed events to ATSv2. Contributed by Abhishesk Modi.
This is an automated email from the ASF dual-hosted git repository.
vrushali pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 27039a2 YARN-9382 Publish container killed, paused and resumed events to ATSv2. Contributed by Abhishesk Modi.
27039a2 is described below
commit 27039a29ae403398182e615fa5c1d0cb91a54268
Author: Vrushali C <vr...@apache.org>
AuthorDate: Fri Apr 5 12:02:43 2019 -0700
YARN-9382 Publish container killed, paused and resumed events to ATSv2. Contributed by Abhishesk Modi.
---
.../server/metrics/ContainerMetricsConstants.java | 9 ++
.../timelineservice/NMTimelinePublisher.java | 102 ++++++++++++++-
.../timelineservice/TestNMTimelinePublisher.java | 137 +++++++++++++++++++++
3 files changed, 247 insertions(+), 1 deletion(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
index 7d6fc92..8b2fb85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
@@ -35,6 +35,15 @@ public class ContainerMetricsConstants {
"YARN_RM_CONTAINER_CREATED";
// Event of this type will be emitted by NM.
+ public static final String PAUSED_EVENT_TYPE = "YARN_CONTAINER_PAUSED";
+
+ // Event of this type will be emitted by NM.
+ public static final String RESUMED_EVENT_TYPE = "YARN_CONTAINER_RESUMED";
+
+ // Event of this type will be emitted by NM.
+ public static final String KILLED_EVENT_TYPE = "YARN_CONTAINER_KILLED";
+
+ // Event of this type will be emitted by NM.
public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED";
// Event of this type will be emitted by RM.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index b2d9376..ba57495 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -25,6 +25,9 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -253,6 +256,95 @@ public class NMTimelinePublisher extends CompositeService {
}
@SuppressWarnings("unchecked")
+ private void publishContainerResumedEvent(
+ ContainerEvent event) {
+ if (publishNMContainerEvents) {
+ ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
+ ContainerId containerId = resumeEvent.getContainerID();
+ ContainerEntity entity = createContainerEntity(containerId);
+
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
+ entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
+ resumeEvent.getDiagnostic());
+ entity.setInfo(entityInfo);
+
+ Container container = context.getContainers().get(containerId);
+ if (container != null) {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ContainerMetricsConstants.RESUMED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+
+ long containerStartTime = container.getContainerStartTime();
+ entity.addEvent(tEvent);
+ entity
+ .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
+ dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+ containerId.getApplicationAttemptId().getApplicationId()));
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void publishContainerPausedEvent(
+ ContainerEvent event) {
+ if (publishNMContainerEvents) {
+ ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
+ ContainerId containerId = pauseEvent.getContainerID();
+ ContainerEntity entity = createContainerEntity(containerId);
+
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
+ entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
+ pauseEvent.getDiagnostic());
+ entity.setInfo(entityInfo);
+
+ Container container = context.getContainers().get(containerId);
+ if (container != null) {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ContainerMetricsConstants.PAUSED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+
+ long containerStartTime = container.getContainerStartTime();
+ entity.addEvent(tEvent);
+ entity
+ .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
+ dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+ containerId.getApplicationAttemptId().getApplicationId()));
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void publishContainerKilledEvent(
+ ContainerEvent event) {
+ if (publishNMContainerEvents) {
+ ContainerKillEvent killEvent = (ContainerKillEvent) event;
+ ContainerId containerId = killEvent.getContainerID();
+ ContainerEntity entity = createContainerEntity(containerId);
+
+ Map<String, Object> entityInfo = new HashMap<String, Object>();
+ entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
+ killEvent.getDiagnostic());
+ entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
+ killEvent.getContainerExitStatus());
+ entity.setInfo(entityInfo);
+
+ Container container = context.getContainers().get(containerId);
+ if (container != null) {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ContainerMetricsConstants.KILLED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+
+ long containerStartTime = container.getContainerStartTime();
+ entity.addEvent(tEvent);
+ entity
+ .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
+ dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+ containerId.getApplicationAttemptId().getApplicationId()));
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
private void publishContainerFinishedEvent(ContainerStatus containerStatus,
long containerFinishTime, long containerStartTime) {
if (publishNMContainerEvents) {
@@ -384,7 +476,15 @@ public class NMTimelinePublisher extends CompositeService {
case INIT_CONTAINER:
publishContainerCreatedEvent(event);
break;
-
+ case KILL_CONTAINER:
+ publishContainerKilledEvent(event);
+ break;
+ case PAUSE_CONTAINER:
+ publishContainerPausedEvent(event);
+ break;
+ case RESUME_CONTAINER:
+ publishContainerResumedEvent(event);
+ break;
default:
LOG.debug("{} is not a desired ContainerEvent which needs to be "
+ " published by NMTimelinePublisher", event.getType());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
index ae51f85..abd27ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -25,7 +25,11 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -35,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -45,6 +50,10 @@ import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.junit.Assert;
@@ -94,6 +103,19 @@ public class TestNMTimelinePublisher {
private Context createMockContext() {
Context context = mock(Context.class);
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
+
+ ConcurrentMap<ContainerId, Container> containers =
+ new ConcurrentHashMap<>();
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
+ Container container = mock(Container.class);
+ when(container.getContainerStartTime())
+ .thenReturn(System.currentTimeMillis());
+ containers.putIfAbsent(cId, container);
+ when(context.getContainers()).thenReturn(containers);
+
return context;
}
@@ -145,6 +167,121 @@ public class TestNMTimelinePublisher {
cId.getContainerId()), entity.getIdPrefix());
}
+ @Test
+ public void testPublishContainerPausedEvent() {
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
+
+ ContainerEvent containerEvent =
+ new ContainerPauseEvent(cId, "test pause");
+
+ publisher.createTimelineClient(appId);
+ publisher.publishContainerEvent(containerEvent);
+ publisher.stopTimelineClient(appId);
+ dispatcher.await();
+
+ ContainerEntity cEntity = new ContainerEntity();
+ cEntity.setId(cId.toString());
+ TimelineEntity[] lastPublishedEntities =
+ timelineClient.getLastPublishedEntities();
+
+ Assert.assertNotNull(lastPublishedEntities);
+ Assert.assertEquals(1, lastPublishedEntities.length);
+ TimelineEntity entity = lastPublishedEntities[0];
+ Assert.assertEquals(cEntity, entity);
+
+ NavigableSet<TimelineEvent> events = entity.getEvents();
+ Assert.assertEquals(1, events.size());
+ Assert.assertEquals(ContainerMetricsConstants.PAUSED_EVENT_TYPE,
+ events.iterator().next().getId());
+
+ Map<String, Object> info = entity.getInfo();
+ Assert.assertTrue(
+ info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+ Assert.assertEquals("test pause",
+ info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+ }
+
+ @Test
+ public void testPublishContainerResumedEvent() {
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
+
+ ContainerEvent containerEvent =
+ new ContainerResumeEvent(cId, "test resume");
+
+ publisher.createTimelineClient(appId);
+ publisher.publishContainerEvent(containerEvent);
+ publisher.stopTimelineClient(appId);
+ dispatcher.await();
+
+ ContainerEntity cEntity = new ContainerEntity();
+ cEntity.setId(cId.toString());
+ TimelineEntity[] lastPublishedEntities =
+ timelineClient.getLastPublishedEntities();
+
+ Assert.assertNotNull(lastPublishedEntities);
+ Assert.assertEquals(1, lastPublishedEntities.length);
+ TimelineEntity entity = lastPublishedEntities[0];
+ Assert.assertEquals(cEntity, entity);
+
+ NavigableSet<TimelineEvent> events = entity.getEvents();
+ Assert.assertEquals(1, events.size());
+ Assert.assertEquals(ContainerMetricsConstants.RESUMED_EVENT_TYPE,
+ events.iterator().next().getId());
+
+ Map<String, Object> info = entity.getInfo();
+ Assert.assertTrue(
+ info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+ Assert.assertEquals("test resume",
+ info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+ }
+
+ @Test
+ public void testPublishContainerKilledEvent() {
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
+
+ ContainerEvent containerEvent =
+ new ContainerKillEvent(cId, 1, "test kill");
+
+ publisher.createTimelineClient(appId);
+ publisher.publishContainerEvent(containerEvent);
+ publisher.stopTimelineClient(appId);
+ dispatcher.await();
+
+ ContainerEntity cEntity = new ContainerEntity();
+ cEntity.setId(cId.toString());
+ TimelineEntity[] lastPublishedEntities =
+ timelineClient.getLastPublishedEntities();
+
+ Assert.assertNotNull(lastPublishedEntities);
+ Assert.assertEquals(1, lastPublishedEntities.length);
+ TimelineEntity entity = lastPublishedEntities[0];
+ Assert.assertEquals(cEntity, entity);
+
+ NavigableSet<TimelineEvent> events = entity.getEvents();
+ Assert.assertEquals(1, events.size());
+ Assert.assertEquals(ContainerMetricsConstants.KILLED_EVENT_TYPE,
+ events.iterator().next().getId());
+
+ Map<String, Object> info = entity.getInfo();
+ Assert.assertTrue(
+ info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+ Assert.assertEquals("test kill",
+ info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+ Assert.assertTrue(
+ info.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO));
+ Assert.assertEquals(1,
+ info.get(ContainerMetricsConstants.EXIT_STATUS_INFO));
+ }
+
@Test public void testContainerResourceUsage() {
ApplicationId appId = ApplicationId.newInstance(0, 1);
publisher.createTimelineClient(appId);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org