You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2019/04/05 19:02:52 UTC

[hadoop] branch trunk updated: YARN-9382 Publish container killed, paused and resumed events to ATSv2. Contributed by Abhishesk Modi.

This is an automated email from the ASF dual-hosted git repository.

vrushali pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27039a2  YARN-9382 Publish container killed, paused and resumed events to ATSv2. Contributed by Abhishesk Modi.
27039a2 is described below

commit 27039a29ae403398182e615fa5c1d0cb91a54268
Author: Vrushali C <vr...@apache.org>
AuthorDate: Fri Apr 5 12:02:43 2019 -0700

    YARN-9382 Publish container killed, paused and resumed events to ATSv2. Contributed by Abhishesk Modi.
---
 .../server/metrics/ContainerMetricsConstants.java  |   9 ++
 .../timelineservice/NMTimelinePublisher.java       | 102 ++++++++++++++-
 .../timelineservice/TestNMTimelinePublisher.java   | 137 +++++++++++++++++++++
 3 files changed, 247 insertions(+), 1 deletion(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
index 7d6fc92..8b2fb85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
@@ -35,6 +35,15 @@ public class ContainerMetricsConstants {
       "YARN_RM_CONTAINER_CREATED";
 
   // Event of this type will be emitted by NM.
+  public static final String PAUSED_EVENT_TYPE = "YARN_CONTAINER_PAUSED";
+
+  // Event of this type will be emitted by NM.
+  public static final String RESUMED_EVENT_TYPE = "YARN_CONTAINER_RESUMED";
+
+  // Event of this type will be emitted by NM.
+  public static final String KILLED_EVENT_TYPE = "YARN_CONTAINER_KILLED";
+
+  // Event of this type will be emitted by NM.
   public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED";
 
   // Event of this type will be emitted by RM.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index b2d9376..ba57495 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -25,6 +25,9 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -253,6 +256,95 @@ public class NMTimelinePublisher extends CompositeService {
   }
 
   @SuppressWarnings("unchecked")
+  private void publishContainerResumedEvent(
+      ContainerEvent event) {
+    if (publishNMContainerEvents) {
+      ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
+      ContainerId containerId = resumeEvent.getContainerID();
+      ContainerEntity entity = createContainerEntity(containerId);
+
+      Map<String, Object> entityInfo = new HashMap<String, Object>();
+      entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
+          resumeEvent.getDiagnostic());
+      entity.setInfo(entityInfo);
+
+      Container container = context.getContainers().get(containerId);
+      if (container != null) {
+        TimelineEvent tEvent = new TimelineEvent();
+        tEvent.setId(ContainerMetricsConstants.RESUMED_EVENT_TYPE);
+        tEvent.setTimestamp(event.getTimestamp());
+
+        long containerStartTime = container.getContainerStartTime();
+        entity.addEvent(tEvent);
+        entity
+            .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
+        dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+            containerId.getApplicationAttemptId().getApplicationId()));
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void publishContainerPausedEvent(
+      ContainerEvent event) {
+    if (publishNMContainerEvents) {
+      ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
+      ContainerId containerId = pauseEvent.getContainerID();
+      ContainerEntity entity = createContainerEntity(containerId);
+
+      Map<String, Object> entityInfo = new HashMap<String, Object>();
+      entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
+          pauseEvent.getDiagnostic());
+      entity.setInfo(entityInfo);
+
+      Container container = context.getContainers().get(containerId);
+      if (container != null) {
+        TimelineEvent tEvent = new TimelineEvent();
+        tEvent.setId(ContainerMetricsConstants.PAUSED_EVENT_TYPE);
+        tEvent.setTimestamp(event.getTimestamp());
+
+        long containerStartTime = container.getContainerStartTime();
+        entity.addEvent(tEvent);
+        entity
+            .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
+        dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+            containerId.getApplicationAttemptId().getApplicationId()));
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void publishContainerKilledEvent(
+      ContainerEvent event) {
+    if (publishNMContainerEvents) {
+      ContainerKillEvent killEvent = (ContainerKillEvent) event;
+      ContainerId containerId = killEvent.getContainerID();
+      ContainerEntity entity = createContainerEntity(containerId);
+
+      Map<String, Object> entityInfo = new HashMap<String, Object>();
+      entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
+          killEvent.getDiagnostic());
+      entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
+          killEvent.getContainerExitStatus());
+      entity.setInfo(entityInfo);
+
+      Container container = context.getContainers().get(containerId);
+      if (container != null) {
+        TimelineEvent tEvent = new TimelineEvent();
+        tEvent.setId(ContainerMetricsConstants.KILLED_EVENT_TYPE);
+        tEvent.setTimestamp(event.getTimestamp());
+
+        long containerStartTime = container.getContainerStartTime();
+        entity.addEvent(tEvent);
+        entity
+            .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
+        dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+            containerId.getApplicationAttemptId().getApplicationId()));
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
   private void publishContainerFinishedEvent(ContainerStatus containerStatus,
       long containerFinishTime, long containerStartTime) {
     if (publishNMContainerEvents) {
@@ -384,7 +476,15 @@ public class NMTimelinePublisher extends CompositeService {
     case INIT_CONTAINER:
       publishContainerCreatedEvent(event);
       break;
-
+    case KILL_CONTAINER:
+      publishContainerKilledEvent(event);
+      break;
+    case PAUSE_CONTAINER:
+      publishContainerPausedEvent(event);
+      break;
+    case RESUME_CONTAINER:
+      publishContainerResumedEvent(event);
+      break;
     default:
       LOG.debug("{} is not a desired ContainerEvent which needs to be "
             + " published by NMTimelinePublisher", event.getType());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
index ae51f85..abd27ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -25,7 +25,11 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -35,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -45,6 +50,10 @@ import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.hadoop.yarn.util.TimelineServiceHelper;
 import org.junit.Assert;
@@ -94,6 +103,19 @@ public class TestNMTimelinePublisher {
   private Context createMockContext() {
     Context context = mock(Context.class);
     when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
+
+    ConcurrentMap<ContainerId, Container> containers =
+        new ConcurrentHashMap<>();
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
+    Container container = mock(Container.class);
+    when(container.getContainerStartTime())
+        .thenReturn(System.currentTimeMillis());
+    containers.putIfAbsent(cId, container);
+    when(context.getContainers()).thenReturn(containers);
+
     return context;
   }
 
@@ -145,6 +167,121 @@ public class TestNMTimelinePublisher {
         cId.getContainerId()), entity.getIdPrefix());
   }
 
+  @Test
+  public void testPublishContainerPausedEvent() {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
+
+    ContainerEvent containerEvent =
+        new ContainerPauseEvent(cId, "test pause");
+
+    publisher.createTimelineClient(appId);
+    publisher.publishContainerEvent(containerEvent);
+    publisher.stopTimelineClient(appId);
+    dispatcher.await();
+
+    ContainerEntity cEntity = new ContainerEntity();
+    cEntity.setId(cId.toString());
+    TimelineEntity[] lastPublishedEntities =
+        timelineClient.getLastPublishedEntities();
+
+    Assert.assertNotNull(lastPublishedEntities);
+    Assert.assertEquals(1, lastPublishedEntities.length);
+    TimelineEntity entity = lastPublishedEntities[0];
+    Assert.assertEquals(cEntity, entity);
+
+    NavigableSet<TimelineEvent> events = entity.getEvents();
+    Assert.assertEquals(1, events.size());
+    Assert.assertEquals(ContainerMetricsConstants.PAUSED_EVENT_TYPE,
+        events.iterator().next().getId());
+
+    Map<String, Object> info = entity.getInfo();
+    Assert.assertTrue(
+        info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+    Assert.assertEquals("test pause",
+        info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+  }
+
+  @Test
+  public void testPublishContainerResumedEvent() {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
+
+    ContainerEvent containerEvent =
+        new ContainerResumeEvent(cId, "test resume");
+
+    publisher.createTimelineClient(appId);
+    publisher.publishContainerEvent(containerEvent);
+    publisher.stopTimelineClient(appId);
+    dispatcher.await();
+
+    ContainerEntity cEntity = new ContainerEntity();
+    cEntity.setId(cId.toString());
+    TimelineEntity[] lastPublishedEntities =
+        timelineClient.getLastPublishedEntities();
+
+    Assert.assertNotNull(lastPublishedEntities);
+    Assert.assertEquals(1, lastPublishedEntities.length);
+    TimelineEntity entity = lastPublishedEntities[0];
+    Assert.assertEquals(cEntity, entity);
+
+    NavigableSet<TimelineEvent> events = entity.getEvents();
+    Assert.assertEquals(1, events.size());
+    Assert.assertEquals(ContainerMetricsConstants.RESUMED_EVENT_TYPE,
+        events.iterator().next().getId());
+
+    Map<String, Object> info = entity.getInfo();
+    Assert.assertTrue(
+        info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+    Assert.assertEquals("test resume",
+        info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+  }
+
+  @Test
+  public void testPublishContainerKilledEvent() {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
+
+    ContainerEvent containerEvent =
+        new ContainerKillEvent(cId, 1, "test kill");
+
+    publisher.createTimelineClient(appId);
+    publisher.publishContainerEvent(containerEvent);
+    publisher.stopTimelineClient(appId);
+    dispatcher.await();
+
+    ContainerEntity cEntity = new ContainerEntity();
+    cEntity.setId(cId.toString());
+    TimelineEntity[] lastPublishedEntities =
+        timelineClient.getLastPublishedEntities();
+
+    Assert.assertNotNull(lastPublishedEntities);
+    Assert.assertEquals(1, lastPublishedEntities.length);
+    TimelineEntity entity = lastPublishedEntities[0];
+    Assert.assertEquals(cEntity, entity);
+
+    NavigableSet<TimelineEvent> events = entity.getEvents();
+    Assert.assertEquals(1, events.size());
+    Assert.assertEquals(ContainerMetricsConstants.KILLED_EVENT_TYPE,
+        events.iterator().next().getId());
+
+    Map<String, Object> info = entity.getInfo();
+    Assert.assertTrue(
+        info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+    Assert.assertEquals("test kill",
+        info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
+    Assert.assertTrue(
+        info.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO));
+    Assert.assertEquals(1,
+        info.get(ContainerMetricsConstants.EXIT_STATUS_INFO));
+  }
+
   @Test public void testContainerResourceUsage() {
     ApplicationId appId = ApplicationId.newInstance(0, 1);
     publisher.createTimelineClient(appId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org