You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2019/09/05 07:19:20 UTC

[hadoop] 05/06: YARN-6735. Have a way to turn off container metrics from NMs. Contributed by Abhishek Modi.

This is an automated email from the ASF dual-hosted git repository.

rohithsharmaks pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 108c569e3b6545872c80f5ba4a1644e65228363d
Author: Rohith Sharma K S <ro...@apache.org>
AuthorDate: Tue Feb 5 13:47:56 2019 +0530

    YARN-6735. Have a way to turn off container metrics from NMs. Contributed by Abhishek Modi.
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  10 +
 .../src/main/resources/yarn-default.xml            |   8 +
 .../timelineservice/NMTimelinePublisher.java       | 276 +++++++++++----------
 .../timelineservice/TestNMTimelinePublisher.java   |   2 +
 4 files changed, 168 insertions(+), 128 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 20b96c2..fe93fda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1219,6 +1219,16 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS =
       "0.0.0.0:" + DEFAULT_NM_COLLECTOR_SERVICE_PORT;
 
+  /**
+   * The setting that controls whether yarn container events are published to
+   * the timeline service or not by NM. This configuration setting is for ATS
+   * V2
+   */
+  public static final String NM_PUBLISH_CONTAINER_EVENTS_ENABLED = NM_PREFIX
+      + "emit-container-events";
+  public static final boolean DEFAULT_NM_PUBLISH_CONTAINER_EVENTS_ENABLED =
+      true;
+
   /** Interval in between cache cleanups.*/
   public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
     NM_PREFIX + "localizer.cache.cleanup.interval-ms";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f99977e..876dd6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1186,6 +1186,14 @@
   </property>
 
   <property>
+    <description>The setting that controls whether yarn container events are
+      published to the timeline service or not by NM. This configuration setting
+      is for ATS V2.</description>
+    <name>yarn.nodemanager.emit-container-events</name>
+    <value>true</value>
+  </property>
+
+  <property>
     <description>Interval in between cache cleanups.</description>
     <name>yarn.nodemanager.localizer.cache.cleanup.interval-ms</name>
     <value>600000</value>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index cbf3e5e..e9bd965 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,6 +89,8 @@ public class NMTimelinePublisher extends CompositeService {
 
   private final Map<ApplicationId, TimelineV2Client> appToClientMap;
 
+  private boolean publishNMContainerEvents = true;
+
   public NMTimelinePublisher(Context context) {
     super(NMTimelinePublisher.class.getName());
     this.context = context;
@@ -110,6 +113,10 @@ public class NMTimelinePublisher extends CompositeService {
     if (webAppURLWithoutScheme.contains(":")) {
       httpPort = webAppURLWithoutScheme.split(":")[1];
     }
+
+    publishNMContainerEvents = conf.getBoolean(
+        YarnConfiguration.NM_PUBLISH_CONTAINER_EVENTS_ENABLED,
+        YarnConfiguration.DEFAULT_NM_PUBLISH_CONTAINER_EVENTS_ENABLED);
     super.serviceInit(conf);
   }
 
@@ -155,31 +162,148 @@ public class NMTimelinePublisher extends CompositeService {
 
   public void reportContainerResourceUsage(Container container, Long pmemUsage,
       Float cpuUsagePercentPerCore) {
-    if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
-        cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
-      ContainerEntity entity =
-          createContainerEntity(container.getContainerId());
-      long currentTimeMillis = System.currentTimeMillis();
-      if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
-        TimelineMetric memoryMetric = new TimelineMetric();
-        memoryMetric.setId(ContainerMetric.MEMORY.toString());
-        memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
-        memoryMetric.addValue(currentTimeMillis, pmemUsage);
-        entity.addMetric(memoryMetric);
-      }
-      if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
-        TimelineMetric cpuMetric = new TimelineMetric();
-        cpuMetric.setId(ContainerMetric.CPU.toString());
-        // TODO: support average
-        cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
-        cpuMetric.addValue(currentTimeMillis,
-            Math.round(cpuUsagePercentPerCore));
-        entity.addMetric(cpuMetric);
+    if (publishNMContainerEvents) {
+      if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE
+          || cpuUsagePercentPerCore !=
+          ResourceCalculatorProcessTree.UNAVAILABLE) {
+        ContainerEntity entity =
+            createContainerEntity(container.getContainerId());
+        long currentTimeMillis = System.currentTimeMillis();
+        if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
+          TimelineMetric memoryMetric = new TimelineMetric();
+          memoryMetric.setId(ContainerMetric.MEMORY.toString());
+          memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+          memoryMetric.addValue(currentTimeMillis, pmemUsage);
+          entity.addMetric(memoryMetric);
+        }
+        if (cpuUsagePercentPerCore !=
+            ResourceCalculatorProcessTree.UNAVAILABLE) {
+          TimelineMetric cpuMetric = new TimelineMetric();
+          cpuMetric.setId(ContainerMetric.CPU.toString());
+          // TODO: support average
+          cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+          cpuMetric.addValue(currentTimeMillis,
+              Math.round(cpuUsagePercentPerCore));
+          entity.addMetric(cpuMetric);
+        }
+        entity.setIdPrefix(TimelineServiceHelper.
+            invertLong(container.getContainerStartTime()));
+        ApplicationId appId = container.getContainerId().
+            getApplicationAttemptId().getApplicationId();
+        try {
+          // no need to put it as part of publisher as timeline client
+          // already has Queuing concept
+          TimelineV2Client timelineClient = getTimelineClient(appId);
+          if (timelineClient != null) {
+            timelineClient.putEntitiesAsync(entity);
+          } else {
+            LOG.error("Seems like client has been removed before the container"
+                + " metric could be published for " +
+                container.getContainerId());
+          }
+        } catch (IOException e) {
+          LOG.error(
+              "Failed to publish Container metrics for container " +
+                  container.getContainerId());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Failed to publish Container metrics for container " +
+                container.getContainerId(), e);
+          }
+        } catch (YarnException e) {
+          LOG.error(
+              "Failed to publish Container metrics for container " +
+                  container.getContainerId(), e.getMessage());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Failed to publish Container metrics for container " +
+                container.getContainerId(), e);
+          }
+        }
       }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void publishContainerCreatedEvent(ContainerEvent event) {
+    if (publishNMContainerEvents) {
+      ContainerId containerId = event.getContainerID();
+      ContainerEntity entity = createContainerEntity(containerId);
+      Container container = context.getContainers().get(containerId);
+      Resource resource = container.getResource();
+
+      Map<String, Object> entityInfo = new HashMap<String, Object>();
+      entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
+          resource.getMemorySize());
+      entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
+          resource.getVirtualCores());
+      entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
+          nodeId.getHost());
+      entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
+          nodeId.getPort());
+      entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
+          container.getPriority().toString());
+      entityInfo.put(
+          ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
+          httpAddress);
+      entity.setInfo(entityInfo);
+
+      TimelineEvent tEvent = new TimelineEvent();
+      tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
+      tEvent.setTimestamp(event.getTimestamp());
+
+      long containerStartTime = container.getContainerStartTime();
+      entity.addEvent(tEvent);
+      entity.setCreatedTime(containerStartTime);
+      entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
+      dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+          containerId.getApplicationAttemptId().getApplicationId()));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void publishContainerFinishedEvent(ContainerStatus containerStatus,
+      long containerFinishTime, long containerStartTime) {
+    if (publishNMContainerEvents) {
+      ContainerId containerId = containerStatus.getContainerId();
+      TimelineEntity entity = createContainerEntity(containerId);
+
+      Map<String, Object> entityInfo = new HashMap<String, Object>();
+      entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
+          containerStatus.getDiagnostics());
+      entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
+          containerStatus.getExitStatus());
+      entityInfo.put(ContainerMetricsConstants.STATE_INFO,
+          ContainerState.COMPLETE.toString());
+      entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
+          containerFinishTime);
+      entity.setInfo(entityInfo);
+
+      TimelineEvent tEvent = new TimelineEvent();
+      tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+      tEvent.setTimestamp(containerFinishTime);
+      entity.addEvent(tEvent);
+      entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
+
+      dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+          containerId.getApplicationAttemptId().getApplicationId()));
+    }
+  }
+
+  private void publishContainerLocalizationEvent(
+      ContainerLocalizationEvent event, String eventType) {
+    if (publishNMContainerEvents) {
+      Container container = event.getContainer();
+      ContainerId containerId = container.getContainerId();
+      TimelineEntity entity = createContainerEntity(containerId);
+
+      TimelineEvent tEvent = new TimelineEvent();
+      tEvent.setId(eventType);
+      tEvent.setTimestamp(event.getTimestamp());
+      entity.addEvent(tEvent);
       entity.setIdPrefix(TimelineServiceHelper.
           invertLong(container.getContainerStartTime()));
-      ApplicationId appId = container.getContainerId().getApplicationAttemptId()
-          .getApplicationId();
+
+      ApplicationId appId = container.getContainerId().
+          getApplicationAttemptId().getApplicationId();
       try {
         // no need to put it as part of publisher as timeline client already has
         // Queuing concept
@@ -187,8 +311,8 @@ public class NMTimelinePublisher extends CompositeService {
         if (timelineClient != null) {
           timelineClient.putEntitiesAsync(entity);
         } else {
-          LOG.error("Seems like client has been removed before the container"
-              + " metric could be published for " + container.getContainerId());
+          LOG.error("Seems like client has been removed before the event"
+              + " could be published for " + container.getContainerId());
         }
       } catch (IOException e) {
         LOG.error("Failed to publish Container metrics for container "
@@ -208,110 +332,6 @@ public class NMTimelinePublisher extends CompositeService {
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private void publishContainerCreatedEvent(ContainerEvent event) {
-    ContainerId containerId = event.getContainerID();
-    ContainerEntity entity = createContainerEntity(containerId);
-    Container container = context.getContainers().get(containerId);
-    Resource resource = container.getResource();
-
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
-        resource.getMemorySize());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
-        resource.getVirtualCores());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
-        nodeId.getHost());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
-        nodeId.getPort());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
-        container.getPriority().toString());
-    entityInfo.put(
-        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
-        httpAddress);
-    entity.setInfo(entityInfo);
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-
-    long containerStartTime = container.getContainerStartTime();
-    entity.addEvent(tEvent);
-    entity.setCreatedTime(containerStartTime);
-    entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
-    dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
-        containerId.getApplicationAttemptId().getApplicationId()));
-  }
-
-  @SuppressWarnings("unchecked")
-  private void publishContainerFinishedEvent(ContainerStatus containerStatus,
-      long containerFinishTime, long containerStartTime) {
-    ContainerId containerId = containerStatus.getContainerId();
-    TimelineEntity entity = createContainerEntity(containerId);
-
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
-        containerStatus.getDiagnostics());
-    entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
-        containerStatus.getExitStatus());
-    entityInfo.put(ContainerMetricsConstants.STATE_INFO,
-        ContainerState.COMPLETE.toString());
-    entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
-        containerFinishTime);
-    entity.setInfo(entityInfo);
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(containerFinishTime);
-    entity.addEvent(tEvent);
-    entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
-
-    dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
-        containerId.getApplicationAttemptId().getApplicationId()));
-  }
-
-  private void publishContainerLocalizationEvent(
-      ContainerLocalizationEvent event, String eventType) {
-    Container container = event.getContainer();
-    ContainerId containerId = container.getContainerId();
-    TimelineEntity entity = createContainerEntity(containerId);
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setId(eventType);
-    tEvent.setTimestamp(event.getTimestamp());
-    entity.addEvent(tEvent);
-    entity.setIdPrefix(TimelineServiceHelper.
-        invertLong(container.getContainerStartTime()));
-
-    ApplicationId appId =
-        container.getContainerId().getApplicationAttemptId().getApplicationId();
-    try {
-      // no need to put it as part of publisher as timeline client already has
-      // Queuing concept
-      TimelineV2Client timelineClient = getTimelineClient(appId);
-      if (timelineClient != null) {
-        timelineClient.putEntitiesAsync(entity);
-      } else {
-        LOG.error("Seems like client has been removed before the event could be"
-            + " published for " + container.getContainerId());
-      }
-    } catch (IOException e) {
-      LOG.error("Failed to publish Container metrics for container "
-          + container.getContainerId());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to publish Container metrics for container "
-            + container.getContainerId(), e);
-      }
-    } catch (YarnException e) {
-      LOG.error("Failed to publish Container metrics for container "
-          + container.getContainerId(), e.getMessage());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to publish Container metrics for container "
-            + container.getContainerId(), e);
-      }
-    }
-  }
-
   private static ContainerEntity createContainerEntity(
       ContainerId containerId) {
     ContainerEntity entity = new ContainerEntity();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
index 2585262..cf9ede0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -67,6 +67,8 @@ public class TestNMTimelinePublisher {
     conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
     conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
         3000L);
+    conf.setBoolean(YarnConfiguration.NM_PUBLISH_CONTAINER_EVENTS_ENABLED,
+        true);
     timelineClient = new DummyTimelineClient(null);
     Context context = createMockContext();
     dispatcher = new DrainDispatcher();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org