You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2015/01/17 01:27:46 UTC

hadoop git commit: YARN-2984. Metrics for container's actual memory usage. (kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 60cbcff2f -> 84198564b


YARN-2984. Metrics for container's actual memory usage. (kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84198564
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84198564
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84198564

Branch: refs/heads/trunk
Commit: 84198564ba6028d51c1fcf9cdcb87f6ae6e08513
Parents: 60cbcff
Author: Karthik Kambatla <ka...@apache.org>
Authored: Sat Jan 17 05:44:04 2015 +0530
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Sat Jan 17 05:44:04 2015 +0530

----------------------------------------------------------------------
 .../metrics2/impl/MetricsCollectorImpl.java     |   3 +-
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  14 ++
 .../monitor/ContainerMetrics.java               | 172 +++++++++++++++++++
 .../monitor/ContainersMonitorImpl.java          |  22 ++-
 .../monitor/TestContainerMetrics.java           |  74 ++++++++
 6 files changed, 285 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/84198564/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java
index be442ed..5345c1b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java
@@ -69,7 +69,8 @@ public class MetricsCollectorImpl implements MetricsCollector,
     return rbs.iterator();
   }
 
-  void clear() { rbs.clear(); }
+  @InterfaceAudience.Private
+  public void clear() { rbs.clear(); }
 
   MetricsCollectorImpl setRecordFilter(MetricsFilter rf) {
     recordFilter = rf;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84198564/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d069f6d..91781f9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -195,6 +195,8 @@ Release 2.7.0 - UNRELEASED
     YARN-2807. Option "--forceactive" not works as described in usage of
     "yarn rmadmin -transitionToActive". (Masatake Iwasaki via xgong)
 
+    YARN-2984. Metrics for container's actual memory usage. (kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84198564/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9ab5298..9ac5438 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -822,6 +822,20 @@ public class YarnConfiguration extends Configuration {
       "container-monitor.procfs-tree.smaps-based-rss.enabled";
   public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED =
       false;
+
+  /** Enable/disable container metrics. */
+  @Private
+  public static final String NM_CONTAINER_METRICS_ENABLE =
+      NM_PREFIX + "container-metrics.enable";
+  @Private
+  public static final boolean DEFAULT_NM_CONTAINER_METRICS_ENABLE = true;
+
+  /** Container metrics flush period. -1 for flush on completion. */
+  @Private
+  public static final String NM_CONTAINER_METRICS_PERIOD_MS =
+      NM_PREFIX + "container-metrics.period-ms";
+  @Private
+  public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1;
   
   /** Prefix for all node manager disk health checker configs. */
   private static final String NM_DISK_HEALTH_CHECK_PREFIX =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84198564/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
new file mode 100644
index 0000000..12201dd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+@InterfaceAudience.Private
+@Metrics(context="container")
+public class ContainerMetrics implements MetricsSource {
+
+  @Metric
+  public MutableStat pMemMBsStat;
+
+  static final MetricsInfo RECORD_INFO =
+      info("ContainerUsage", "Resource usage by container");
+
+  final MetricsInfo recordInfo;
+  final MetricsRegistry registry;
+  final ContainerId containerId;
+  final MetricsSystem metricsSystem;
+
+  // Metrics publishing status
+  private long flushPeriodMs;
+  private boolean flushOnPeriod = false; // true if period elapsed
+  private boolean finished = false; // true if container finished
+  private boolean unregister = false; // unregister
+  private Timer timer; // lazily initialized
+
+  /**
+   * Simple metrics cache to help prevent re-registrations.
+   */
+  protected final static Map<ContainerId, ContainerMetrics>
+      usageMetrics = new HashMap<>();
+
+  ContainerMetrics(
+      MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
+    this.recordInfo =
+        info(sourceName(containerId), RECORD_INFO.description());
+    this.registry = new MetricsRegistry(recordInfo);
+    this.metricsSystem = ms;
+    this.containerId = containerId;
+    this.flushPeriodMs = flushPeriodMs;
+    scheduleTimerTaskIfRequired();
+
+    this.pMemMBsStat = registry.newStat(
+        "pMem", "Physical memory stats", "Usage", "MBs", true);
+  }
+
+  ContainerMetrics tag(MetricsInfo info, ContainerId containerId) {
+    registry.tag(info, containerId.toString());
+    return this;
+  }
+
+  static String sourceName(ContainerId containerId) {
+    return RECORD_INFO.name() + "_" + containerId.toString();
+  }
+
+  public static ContainerMetrics forContainer(ContainerId containerId) {
+    return forContainer(containerId, -1L);
+  }
+
+  public static ContainerMetrics forContainer(
+      ContainerId containerId, long flushPeriodMs) {
+    return forContainer(
+        DefaultMetricsSystem.instance(), containerId, flushPeriodMs);
+  }
+
+  synchronized static ContainerMetrics forContainer(
+      MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
+    ContainerMetrics metrics = usageMetrics.get(containerId);
+    if (metrics == null) {
+      metrics = new ContainerMetrics(
+          ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId);
+
+      // Register with the MetricsSystems
+      if (ms != null) {
+        metrics =
+            ms.register(sourceName(containerId),
+                "Metrics for container: " + containerId, metrics);
+      }
+      usageMetrics.put(containerId, metrics);
+    }
+
+    return metrics;
+  }
+
+  @Override
+  public synchronized void getMetrics(MetricsCollector collector, boolean all) {
+    //Container goes through registered -> finished -> unregistered.
+    if (unregister) {
+      metricsSystem.unregisterSource(recordInfo.name());
+      usageMetrics.remove(containerId);
+      return;
+    }
+
+    if (finished || flushOnPeriod) {
+      registry.snapshot(collector.addRecord(registry.info()), all);
+    }
+
+    if (finished) {
+      this.unregister = true;
+    } else if (flushOnPeriod) {
+      flushOnPeriod = false;
+      scheduleTimerTaskIfRequired();
+    }
+  }
+
+  public synchronized void finished() {
+    this.finished = true;
+    if (timer != null) {
+      timer.cancel();
+      timer = null;
+    }
+  }
+
+  public void recordMemoryUsage(int memoryMBs) {
+    this.pMemMBsStat.add(memoryMBs);
+  }
+
+  private synchronized void scheduleTimerTaskIfRequired() {
+    if (flushPeriodMs > 0) {
+      // Lazily initialize timer
+      if (timer == null) {
+        this.timer = new Timer("Metrics flush checker", true);
+      }
+      TimerTask timerTask = new TimerTask() {
+        @Override
+        public void run() {
+          synchronized (ContainerMetrics.this) {
+            if (!finished) {
+              flushOnPeriod = true;
+            }
+          }
+        }
+      };
+      timer.schedule(timerTask, flushPeriodMs);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84198564/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 02a63ac..0ae4325 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -51,6 +51,8 @@ public class ContainersMonitorImpl extends AbstractService implements
 
   private long monitoringInterval;
   private MonitoringThread monitoringThread;
+  private boolean containerMetricsEnabled;
+  private long containerMetricsPeriodMs;
 
   final List<ContainerId> containersToBeRemoved;
   final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
@@ -106,6 +108,13 @@ public class ContainersMonitorImpl extends AbstractService implements
     LOG.info(" Using ResourceCalculatorProcessTree : "
         + this.processTreeClass);
 
+    this.containerMetricsEnabled =
+        conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
+            YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);
+    this.containerMetricsPeriodMs =
+        conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
+            YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
+
     long configuredPMemForContainers = conf.getLong(
         YarnConfiguration.NM_PMEM_MB,
         YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
@@ -352,6 +361,9 @@ public class ContainersMonitorImpl extends AbstractService implements
         // Remove finished containers
         synchronized (containersToBeRemoved) {
           for (ContainerId containerId : containersToBeRemoved) {
+            if (containerMetricsEnabled) {
+              ContainerMetrics.forContainer(containerId).finished();
+            }
             trackingContainers.remove(containerId);
             LOG.info("Stopping resource-monitoring for " + containerId);
           }
@@ -408,7 +420,15 @@ public class ContainersMonitorImpl extends AbstractService implements
             LOG.info(String.format(
                 "Memory usage of ProcessTree %s for container-id %s: ",
                      pId, containerId.toString()) +
-                formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit));
+                formatUsageString(
+                    currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit));
+
+            // Add usage to container metrics
+            if (containerMetricsEnabled) {
+              ContainerMetrics.forContainer(
+                  containerId, containerMetricsPeriodMs).recordMemoryUsage(
+                  (int) (currentPmemUsage >> 20));
+            }
 
             boolean isMemoryOverLimit = false;
             String msg = "";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84198564/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
new file mode 100644
index 0000000..158e2a8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.util.Timer;
+
+public class TestContainerMetrics {
+
+  @Test
+  public void testContainerMetricsFlow() throws InterruptedException {
+    final String ERR = "Error in number of records";
+
+    // Create a dummy MetricsSystem
+    MetricsSystem system = mock(MetricsSystem.class);
+    doReturn(this).when(system).register(anyString(), anyString(), any());
+
+    MetricsCollectorImpl collector = new MetricsCollectorImpl();
+    ContainerId containerId = mock(ContainerId.class);
+    ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
+
+    metrics.recordMemoryUsage(1024);
+    metrics.getMetrics(collector, true);
+    assertEquals(ERR, 0, collector.getRecords().size());
+
+    Thread.sleep(110);
+    metrics.getMetrics(collector, true);
+    assertEquals(ERR, 1, collector.getRecords().size());
+    collector.clear();
+
+    Thread.sleep(110);
+    metrics.getMetrics(collector, true);
+    assertEquals(ERR, 1, collector.getRecords().size());
+    collector.clear();
+
+    metrics.finished();
+    metrics.getMetrics(collector, true);
+    assertEquals(ERR, 1, collector.getRecords().size());
+    collector.clear();
+
+    metrics.getMetrics(collector, true);
+    assertEquals(ERR, 0, collector.getRecords().size());
+
+    Thread.sleep(110);
+    metrics.getMetrics(collector, true);
+    assertEquals(ERR, 0, collector.getRecords().size());
+  }
+}