You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/30 21:01:43 UTC

[16/50] [abbrv] hadoop git commit: YARN-4599. Set OOM control for memory cgroups. (Miklos Szegedi via Haibo Chen)

YARN-4599. Set OOM control for memory cgroups. (Miklos Szegedi via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9686584f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9686584f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9686584f

Branch: refs/heads/HDDS-48
Commit: 9686584f919df716b415c1477572b4c31752c872
Parents: 4772e79
Author: Haibo Chen <ha...@apache.org>
Authored: Wed May 23 11:29:55 2018 -0700
Committer: Hanisha Koneru <ha...@apache.org>
Committed: Wed May 30 14:00:25 2018 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  26 +-
 .../src/main/resources/yarn-default.xml         |  67 ++-
 .../src/CMakeLists.txt                          |  19 +
 .../CGroupElasticMemoryController.java          | 476 +++++++++++++++++++
 .../linux/resources/CGroupsHandler.java         |   6 +
 .../linux/resources/CGroupsHandlerImpl.java     |   6 +-
 .../CGroupsMemoryResourceHandlerImpl.java       |  15 -
 .../linux/resources/DefaultOOMHandler.java      | 254 ++++++++++
 .../monitor/ContainersMonitorImpl.java          |  50 ++
 .../executor/ContainerSignalContext.java        |  41 ++
 .../native/oom-listener/impl/oom_listener.c     | 171 +++++++
 .../native/oom-listener/impl/oom_listener.h     | 102 ++++
 .../oom-listener/impl/oom_listener_main.c       | 104 ++++
 .../oom-listener/test/oom_listener_test_main.cc | 292 ++++++++++++
 .../resources/DummyRunnableWithContext.java     |  31 ++
 .../TestCGroupElasticMemoryController.java      | 319 +++++++++++++
 .../TestCGroupsMemoryResourceHandlerImpl.java   |   6 +-
 .../linux/resources/TestDefaultOOMHandler.java  | 307 ++++++++++++
 .../monitor/TestContainersMonitor.java          |   1 +
 .../TestContainersMonitorResourceChange.java    |   3 +-
 .../site/markdown/NodeManagerCGroupsMemory.md   | 133 ++++++
 22 files changed, 2391 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 934c009..428950b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,7 @@
 target
 build
 dependency-reduced-pom.xml
+make-build-debug
 
 # Filesystem contract test options and credentials
 auth-keys.xml

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8e56cb8..6d08831 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1440,6 +1440,25 @@ public class YarnConfiguration extends Configuration {
     NM_PREFIX + "vmem-pmem-ratio";
   public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f;
 
+  /** Specifies whether to do memory check on overall usage. */
+  public static final String NM_ELASTIC_MEMORY_CONTROL_ENABLED = NM_PREFIX
+      + "elastic-memory-control.enabled";
+  public static final boolean DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED = false;
+
+  /** Specifies the OOM handler code. */
+  public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER = NM_PREFIX
+      + "elastic-memory-control.oom-handler";
+
+  /** The path to the OOM listener.*/
+  public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH =
+      NM_PREFIX + "elastic-memory-control.oom-listener.path";
+
+  /** Maximum time in seconds to resolve an OOM situation. */
+  public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC =
+      NM_PREFIX + "elastic-memory-control.timeout-sec";
+  public static final Integer
+      DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC = 5;
+
   /** Number of Virtual CPU Cores which can be allocated for containers.*/
   public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores";
   public static final int DEFAULT_NM_VCORES = 8;
@@ -2006,13 +2025,6 @@ public class YarnConfiguration extends Configuration {
   /** The path to the Linux container executor.*/
   public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH =
     NM_PREFIX + "linux-container-executor.path";
-  
-  /** 
-   * The UNIX group that the linux-container-executor should run as.
-   * This is intended to be set as part of container-executor.cfg. 
-   */
-  public static final String NM_LINUX_CONTAINER_GROUP =
-    NM_PREFIX + "linux-container-executor.group";
 
   /**
    * True if linux-container-executor should limit itself to one user

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 156ca24..da44ccb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -772,7 +772,7 @@
   <property>
     <description>Maximum size in bytes for configurations that can be provided
       by application to RM for delegation token renewal.
-      By experiment, it's roughly 128 bytes per key-value pair.
+      By experiment, its roughly 128 bytes per key-value pair.
       The default value 12800 allows roughly 100 configs, may be less.
     </description>
     <name>yarn.resourcemanager.delegation-token.max-conf-size-bytes</name>
@@ -1860,14 +1860,6 @@
   </property>
 
   <property>
-    <description>
-    The UNIX group that the linux-container-executor should run as.
-    </description>
-    <name>yarn.nodemanager.linux-container-executor.group</name>
-    <value></value>
-  </property>
-
-  <property>
     <description>T-file compression types used to compress aggregated logs.</description>
     <name>yarn.nodemanager.log-aggregation.compression-type</name>
     <value>none</value>
@@ -2158,7 +2150,7 @@
     <description>
     In the server side it indicates whether timeline service is enabled or not.
     And in the client side, users can enable it to indicate whether client wants
-    to use timeline service. If it's enabled in the client side along with
+    to use timeline service. If its enabled in the client side along with
     security, then yarn client tries to fetch the delegation tokens for the
     timeline server.
     </description>
@@ -3404,7 +3396,7 @@
     <description>
       Defines the limit of the diagnostics message of an application
       attempt, in kilo characters (character count * 1024).
-      When using ZooKeeper to store application state behavior, it's
+      When using ZooKeeper to store application state behavior, its
       important to limit the size of the diagnostic messages to
       prevent YARN from overwhelming ZooKeeper. In cases where
       yarn.resourcemanager.state-store.max-completed-applications is set to
@@ -3819,4 +3811,57 @@
     <value>/usr/bin/numactl</value>
   </property>
 
+  <property>
+    <description>
+      Enable elastic memory control. This is a Linux only feature.
+      When enabled, the node manager adds a listener to receive an
+      event, if all the containers exceeded a limit.
+      The limit is specified by yarn.nodemanager.resource.memory-mb.
+      If this is not set, the limit is set based on the capabilities.
+      See yarn.nodemanager.resource.detect-hardware-capabilities
+      for details.
+      The limit applies to the physical or virtual (rss+swap) memory
+      depending on whether yarn.nodemanager.pmem-check-enabled or
+      yarn.nodemanager.vmem-check-enabled is set.
+    </description>
+    <name>yarn.nodemanager.elastic-memory-control.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+      The name of a JVM class. The class must implement the Runnable
+      interface. It is called,
+      if yarn.nodemanager.elastic-memory-control.enabled
+      is set and the system reaches its memory limit.
+      When called the handler must preempt a container,
+      since all containers are frozen by cgroups.
+      Once preempted some memory is released, so that the
+      kernel can resume all containers. Because of this the
+      handler has to act quickly.
+    </description>
+    <name>yarn.nodemanager.elastic-memory-control.oom-handler</name>
+    <value>org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.DefaultOOMHandler</value>
+  </property>
+
+  <property>
+    <description>
+      The path to the oom-listener tool. Elastic memory control is only
+      supported on Linux. It relies on kernel events. The tool forwards
+      these kernel events to the standard input, so that the node manager
+      can preempt containers, in and out-of-memory scenario.
+      You rarely need to update this setting.
+    </description>
+    <name>yarn.nodemanager.elastic-memory-control.oom-listener.path</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>
+      Maximum time to wait for an OOM situation to get resolved before
+      bringing down the node.
+    </description>
+    <name>yarn.nodemanager.elastic-memory-control.timeout-sec</name>
+    <value>5</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
index 79faeec..a614f80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
@@ -30,6 +30,7 @@ string(REGEX MATCH . HCD_ONE "${HADOOP_CONF_DIR}")
 string(COMPARE EQUAL ${HCD_ONE} / HADOOP_CONF_DIR_IS_ABS)
 
 set (CMAKE_C_STANDARD 99)
+set (CMAKE_CXX_STANDARD 11)
 
 include(CheckIncludeFiles)
 check_include_files("sys/types.h;sys/sysctl.h" HAVE_SYS_SYSCTL_H)
@@ -113,6 +114,7 @@ include_directories(
     ${GTEST_SRC_DIR}/include
     main/native/container-executor
     main/native/container-executor/impl
+    main/native/oom-listener/impl
 )
 # add gtest as system library to suppress gcc warnings
 include_directories(SYSTEM ${GTEST_SRC_DIR}/include)
@@ -171,3 +173,20 @@ add_executable(cetest
         main/native/container-executor/test/utils/test_docker_util.cc)
 target_link_libraries(cetest gtest container)
 output_directory(cetest test)
+
+# CGroup OOM listener
+add_executable(oom-listener
+        main/native/oom-listener/impl/oom_listener.c
+        main/native/oom-listener/impl/oom_listener.h
+        main/native/oom-listener/impl/oom_listener_main.c
+)
+output_directory(oom-listener target/usr/local/bin)
+
+# CGroup OOM listener test with GTest
+add_executable(test-oom-listener
+        main/native/oom-listener/impl/oom_listener.c
+        main/native/oom-listener/impl/oom_listener.h
+        main/native/oom-listener/test/oom_listener_test_main.cc
+)
+target_link_libraries(test-oom-listener gtest)
+output_directory(test-oom-listener test)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java
new file mode 100644
index 0000000..752c3a6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+
+import java.io.File;
+import java.io.InputStream;
+import java.lang.reflect.Constructor;
+import java.nio.charset.Charset;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_PMEM_CHECK_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_VMEM_CHECK_ENABLED;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_NO_LIMIT;
+
+/**
+ * This thread controls memory usage using cgroups. It listens to out of memory
+ * events of all the containers together, and if we go over the limit picks
+ * a container to kill. The algorithm that picks the container is a plugin.
+ */
+public class CGroupElasticMemoryController extends Thread {
+  protected static final Log LOG = LogFactory
+      .getLog(CGroupElasticMemoryController.class);
+  private final Clock clock = new MonotonicClock();
+  private String yarnCGroupPath;
+  private String oomListenerPath;
+  private Runnable oomHandler;
+  private CGroupsHandler cgroups;
+  private boolean controlPhysicalMemory;
+  private boolean controlVirtualMemory;
+  private long limit;
+  private Process process = null;
+  private boolean stopped = false;
+  private int timeoutMS;
+
+  /**
+   * Default constructor.
+   * @param conf Yarn configuration to use
+   * @param context Node manager context to out of memory handler
+   * @param cgroups Cgroups handler configured
+   * @param controlPhysicalMemory Whether to listen to physical memory OOM
+   * @param controlVirtualMemory Whether to listen to virtual memory OOM
+   * @param limit memory limit in bytes
+   * @param oomHandlerOverride optional OOM handler
+   * @exception YarnException Could not instantiate class
+   */
+  @VisibleForTesting
+  CGroupElasticMemoryController(Configuration conf,
+                                       Context context,
+                                       CGroupsHandler cgroups,
+                                       boolean controlPhysicalMemory,
+                                       boolean controlVirtualMemory,
+                                       long limit,
+                                       Runnable oomHandlerOverride)
+      throws YarnException {
+    super("CGroupElasticMemoryController");
+    boolean controlVirtual = controlVirtualMemory && !controlPhysicalMemory;
+    Runnable oomHandlerTemp =
+        getDefaultOOMHandler(conf, context, oomHandlerOverride, controlVirtual);
+    if (controlPhysicalMemory && controlVirtualMemory) {
+      LOG.warn(
+          NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " +
+          "We cannot control both virtual and physical " +
+          "memory at the same time. Enforcing virtual memory. " +
+          "If swapping is enabled set " +
+          "only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " +
+          "only " + NM_VMEM_CHECK_ENABLED + " to true.");
+    }
+    if (!controlPhysicalMemory && !controlVirtualMemory) {
+      throw new YarnException(
+          NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " +
+              "We need either virtual or physical memory check requested. " +
+              "If swapping is enabled set " +
+              "only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " +
+              "only " + NM_VMEM_CHECK_ENABLED + " to true.");
+    }
+    // We are safe at this point that no more exceptions can be thrown
+    this.timeoutMS =
+        1000 * conf.getInt(NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC,
+        DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC);
+    this.oomListenerPath = getOOMListenerExecutablePath(conf);
+    this.oomHandler = oomHandlerTemp;
+    this.cgroups = cgroups;
+    this.controlPhysicalMemory = !controlVirtual;
+    this.controlVirtualMemory = controlVirtual;
+    this.yarnCGroupPath = this.cgroups
+        .getPathForCGroup(CGroupsHandler.CGroupController.MEMORY, "");
+    this.limit = limit;
+  }
+
+  /**
+   * Get the configured OOM handler.
+   * @param conf configuration
+   * @param context context to pass to constructor
+   * @param oomHandlerLocal Default override
+   * @param controlVirtual Control physical or virtual memory
+   * @return The configured or overridden OOM handler.
+   * @throws YarnException in case the constructor failed
+   */
+  private Runnable getDefaultOOMHandler(
+      Configuration conf, Context context, Runnable oomHandlerLocal,
+      boolean controlVirtual)
+      throws YarnException {
+    Class oomHandlerClass =
+        conf.getClass(
+            YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER,
+            DefaultOOMHandler.class);
+    if (oomHandlerLocal == null) {
+      try {
+        Constructor constr = oomHandlerClass.getConstructor(
+            Context.class, boolean.class);
+        oomHandlerLocal = (Runnable)constr.newInstance(
+            context, controlVirtual);
+      } catch (Exception ex) {
+        throw new YarnException(ex);
+      }
+    }
+    return oomHandlerLocal;
+  }
+
+  /**
+   * Default constructor.
+   * @param conf Yarn configuration to use
+   * @param context Node manager context to out of memory handler
+   * @param cgroups Cgroups handler configured
+   * @param controlPhysicalMemory Whether to listen to physical memory OOM
+   * @param controlVirtualMemory Whether to listen to virtual memory OOM
+   * @param limit memory limit in bytes
+   * @exception YarnException Could not instantiate class
+   */
+  public CGroupElasticMemoryController(Configuration conf,
+                                       Context context,
+                                       CGroupsHandler cgroups,
+                                       boolean controlPhysicalMemory,
+                                       boolean controlVirtualMemory,
+                                       long limit)
+      throws YarnException {
+    this(conf,
+        context,
+        cgroups,
+        controlPhysicalMemory,
+        controlVirtualMemory,
+        limit,
+        null);
+  }
+
+  /**
+   * Exception thrown if the OOM situation is not resolved.
+   */
+  static private class OOMNotResolvedException extends YarnRuntimeException {
+    OOMNotResolvedException(String message, Exception parent) {
+      super(message, parent);
+    }
+  }
+
+  /**
+   * Stop listening to the cgroup.
+   */
+  public synchronized void stopListening() {
+    stopped = true;
+    if (process != null) {
+      process.destroyForcibly();
+    } else {
+      LOG.warn("Trying to stop listening, when listening is not running");
+    }
+  }
+
+  /**
+   * Checks if the CGroupElasticMemoryController is available on this system.
+   * This assumes that Linux container executor is already initialized.
+   * We need to have CGroups enabled.
+   *
+   * @return True if CGroupElasticMemoryController is available.
+   * False otherwise.
+   */
+  public static boolean isAvailable() {
+    try {
+      if (!Shell.LINUX) {
+        LOG.info("CGroupElasticMemoryController currently is supported only "
+            + "on Linux.");
+        return false;
+      }
+      if (ResourceHandlerModule.getCGroupsHandler() == null ||
+          ResourceHandlerModule.getMemoryResourceHandler() == null) {
+        LOG.info("CGroupElasticMemoryController requires enabling " +
+            "memory CGroups with" +
+            YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED);
+        return false;
+      }
+    } catch (SecurityException se) {
+      LOG.info("Failed to get Operating System name. " + se);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Main OOM listening thread. It uses an external process to listen to
+   * Linux events. The external process does not need to run as root, so
+   * it is not related to container-executor. We do not use JNI for security
+   * reasons.
+   */
+  @Override
+  public void run() {
+    ExecutorService executor = null;
+    try {
+      // Disable OOM killer and set a limit.
+      // This has to be set first, so that we get notified about valid events.
+      // We will be notified about events even, if they happened before
+      // oom-listener started
+      setCGroupParameters();
+
+      // Start a listener process
+      ProcessBuilder oomListener = new ProcessBuilder();
+      oomListener.command(oomListenerPath, yarnCGroupPath);
+      synchronized (this) {
+        if (!stopped) {
+          process = oomListener.start();
+        } else {
+          resetCGroupParameters();
+          LOG.info("Listener stopped before starting");
+          return;
+        }
+      }
+      LOG.info(String.format("Listening on %s with %s",
+          yarnCGroupPath,
+          oomListenerPath));
+
+      // We need 1 thread for the error stream and a few others
+      // as a watchdog for the OOM killer
+      executor = Executors.newFixedThreadPool(2);
+
+      // Listen to any errors in the background. We do not expect this to
+      // be large in size, so it will fit into a string.
+      Future<String> errorListener = executor.submit(
+          () -> IOUtils.toString(process.getErrorStream(),
+              Charset.defaultCharset()));
+
+      // We get Linux event increments (8 bytes) forwarded from the event stream
+      // The events cannot be split, so it is safe to read them as a whole
+      // There is no race condition with the cgroup
+      // running out of memory. If oom is 1 at startup
+      // oom_listener will send an initial notification
+      InputStream events = process.getInputStream();
+      byte[] event = new byte[8];
+      int read;
+      // This loop can be exited by terminating the process
+      // with stopListening()
+      while ((read = events.read(event)) == event.length) {
+        // An OOM event has occurred
+        resolveOOM(executor);
+      }
+
+      if (read != -1) {
+        LOG.warn(String.format("Characters returned from event hander: %d",
+            read));
+      }
+
+      // If the input stream is closed, we wait for exit or process terminated.
+      int exitCode = process.waitFor();
+      String error = errorListener.get();
+      process = null;
+      LOG.info(String.format("OOM listener exited %d %s", exitCode, error));
+    } catch (OOMNotResolvedException ex) {
+      // We could mark the node unhealthy but it shuts down the node anyways.
+      // Let's just bring down the node manager all containers are frozen.
+      throw new YarnRuntimeException("Could not resolve OOM", ex);
+    } catch (Exception ex) {
+      synchronized (this) {
+        if (!stopped) {
+          LOG.warn("OOM Listener exiting.", ex);
+        }
+      }
+    } finally {
+      // Make sure we do not leak the child process,
+      // especially if process.waitFor() did not finish.
+      if (process != null && process.isAlive()) {
+        process.destroyForcibly();
+      }
+      if (executor != null) {
+        try {
+          executor.awaitTermination(6, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          LOG.warn("Exiting without processing all OOM events.");
+        }
+        executor.shutdown();
+      }
+      resetCGroupParameters();
+    }
+  }
+
+  /**
+   * Resolve an OOM event.
+   * Listen to the handler timeouts.
+   * @param executor Executor to create watchdog with.
+   * @throws InterruptedException interrupted
+   * @throws java.util.concurrent.ExecutionException cannot launch watchdog
+   */
+  private void resolveOOM(ExecutorService executor)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    // Just log, when we are still in OOM after a couple of seconds
+    final long start = clock.getTime();
+    Future<Boolean> watchdog =
+        executor.submit(() -> watchAndLogOOMState(start));
+    // Kill something to resolve the issue
+    try {
+      oomHandler.run();
+    } catch (RuntimeException ex) {
+      watchdog.cancel(true);
+      throw new OOMNotResolvedException("OOM handler failed", ex);
+    }
+    if (!watchdog.get()) {
+      // If we are still in OOM,
+      // the watchdog will trigger stop
+      // listening to exit this loop
+      throw new OOMNotResolvedException("OOM handler timed out", null);
+    }
+  }
+
+  /**
+   * Just watch until we are in OOM and log. Send an update log every second.
+   * @return if the OOM was resolved successfully
+   */
+  private boolean watchAndLogOOMState(long start) {
+    long lastLog = start;
+    try {
+      long end = start;
+      // Throw an error, if we are still in OOM after 5 seconds
+      while(end - start < timeoutMS) {
+        end = clock.getTime();
+        String underOOM = cgroups.getCGroupParam(
+            CGroupsHandler.CGroupController.MEMORY,
+            "",
+            CGROUP_PARAM_MEMORY_OOM_CONTROL);
+        if (underOOM.contains(CGroupsHandler.UNDER_OOM)) {
+          if (end - lastLog > 1000) {
+            LOG.warn(String.format(
+                "OOM not resolved in %d ms", end - start));
+            lastLog = end;
+          }
+        } else {
+          LOG.info(String.format(
+              "Resolved OOM in %d ms", end - start));
+          return true;
+        }
+        // We do not want to saturate the CPU
+        // leaving the resources to the actual OOM killer
+        // but we want to be fast, too.
+        Thread.sleep(10);
+      }
+    } catch (InterruptedException ex) {
+      LOG.debug("Watchdog interrupted");
+    } catch (Exception e) {
+      LOG.warn("Exception running logging thread", e);
+    }
+    LOG.warn(String.format("OOM was not resolved in %d ms",
+        clock.getTime() - start));
+    stopListening();
+    return false;
+  }
+
+  /**
+   * Update root memory cgroup. This contains all containers.
+   * The physical limit has to be set first then the virtual limit.
+   */
+  private void setCGroupParameters() throws ResourceHandlerException {
+    // Disable the OOM killer
+    cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+        CGROUP_PARAM_MEMORY_OOM_CONTROL, "1");
+    if (controlPhysicalMemory && !controlVirtualMemory) {
+      try {
+        // Ignore virtual memory limits, since we do not know what it is set to
+        cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+            CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+      } catch (ResourceHandlerException ex) {
+        LOG.debug("Swap monitoring is turned off in the kernel");
+      }
+      // Set physical memory limits
+      cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit));
+    } else if (controlVirtualMemory && !controlPhysicalMemory) {
+      // Ignore virtual memory limits, since we do not know what it is set to
+      cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+      // Set physical limits to no more than virtual limits
+      cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit));
+      // Set virtual memory limits
+      // Important: it has to be set after physical limit is set
+      cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, Long.toString(limit));
+    } else {
+      throw new ResourceHandlerException(
+          String.format("Unsupported scenario physical:%b virtual:%b",
+              controlPhysicalMemory, controlVirtualMemory));
+    }
+  }
+
+  /**
+   * Reset root memory cgroup to OS defaults. This controls all containers.
+   */
+  private void resetCGroupParameters() {
+    try {
+      try {
+        // Disable memory limits
+        cgroups.updateCGroupParam(
+            CGroupsHandler.CGroupController.MEMORY, "",
+            CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+      } catch (ResourceHandlerException ex) {
+        LOG.debug("Swap monitoring is turned off in the kernel");
+      }
+      cgroups.updateCGroupParam(
+          CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+      // Enable the OOM killer
+      cgroups.updateCGroupParam(
+          CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_OOM_CONTROL, "0");
+    } catch (ResourceHandlerException ex) {
+      LOG.warn("Error in cleanup", ex);
+    }
+  }
+
+  private static String getOOMListenerExecutablePath(Configuration conf) {
+    String yarnHomeEnvVar =
+        System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
+    if (yarnHomeEnvVar == null) {
+      yarnHomeEnvVar = ".";
+    }
+    File hadoopBin = new File(yarnHomeEnvVar, "bin");
+    String defaultPath =
+        new File(hadoopBin, "oom-listener").getAbsolutePath();
+    final String path = conf.get(
+        YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
+        defaultPath);
+    LOG.debug(String.format("oom-listener path: %s %s", path, defaultPath));
+    return path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
index e279504..9dc16c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
@@ -76,8 +76,14 @@ public interface CGroupsHandler {
   String CGROUP_PARAM_BLKIO_WEIGHT = "weight";
 
   String CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES = "limit_in_bytes";
+  String CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES = "memsw.limit_in_bytes";
   String CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES = "soft_limit_in_bytes";
+  String CGROUP_PARAM_MEMORY_OOM_CONTROL = "oom_control";
   String CGROUP_PARAM_MEMORY_SWAPPINESS = "swappiness";
+  String CGROUP_PARAM_MEMORY_USAGE_BYTES = "usage_in_bytes";
+  String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes";
+  String CGROUP_NO_LIMIT = "-1";
+  String UNDER_OOM = "under_oom 1";
 
 
   String CGROUP_CPU_PERIOD_US = "cfs_period_us";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
index 008f3d7..6ed94e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
@@ -594,7 +594,11 @@ class CGroupsHandlerImpl implements CGroupsHandler {
   @Override
   public String getCGroupParam(CGroupController controller, String cGroupId,
       String param) throws ResourceHandlerException {
-    String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param);
+    String cGroupParamPath =
+        param.equals(CGROUP_FILE_TASKS) ?
+            getPathForCGroup(controller, cGroupId)
+                + Path.SEPARATOR + param :
+        getPathForCGroupParam(controller, cGroupId, param);
 
     try {
       byte[] contents = Files.readAllBytes(Paths.get(cGroupParamPath));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
index 2d1585e..a57adb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
@@ -65,21 +65,6 @@ public class CGroupsMemoryResourceHandlerImpl implements MemoryResourceHandler {
   @Override
   public List<PrivilegedOperation> bootstrap(Configuration conf)
       throws ResourceHandlerException {
-    boolean pmemEnabled =
-        conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED,
-            YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);
-    boolean vmemEnabled =
-        conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED,
-            YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
-    if (pmemEnabled || vmemEnabled) {
-      String msg = "The default YARN physical and/or virtual memory health"
-          + " checkers as well as the CGroups memory controller are enabled. "
-          + "If you wish to use the Cgroups memory controller, please turn off"
-          + " the default physical/virtual memory checkers by setting "
-          + YarnConfiguration.NM_PMEM_CHECK_ENABLED + " and "
-          + YarnConfiguration.NM_VMEM_CHECK_ENABLED + " to false.";
-      throw new ResourceHandlerException(msg);
-    }
     this.cGroupsHandler.initializeCGroupController(MEMORY);
     enforce = conf.getBoolean(
         YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
new file mode 100644
index 0000000..c690225
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES;
+
+/**
+ * A very basic OOM handler implementation.
+ * See the javadoc on the run() method for details.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class DefaultOOMHandler implements Runnable {
+  protected static final Log LOG = LogFactory
+      .getLog(DefaultOOMHandler.class);
+  private Context context;
+  private boolean virtual;
+  private CGroupsHandler cgroups;
+
+  /**
+   * Create an OOM handler.
+   * This has to be public to be able to construct through reflection.
+   * @param context node manager context to work with
+   * @param testVirtual Test virtual memory or physical
+   */
+  public DefaultOOMHandler(Context context, boolean testVirtual) {
+    this.context = context;
+    this.virtual = testVirtual;
+    this.cgroups = ResourceHandlerModule.getCGroupsHandler();
+  }
+
+  @VisibleForTesting
+  void setCGroupsHandler(CGroupsHandler handler) {
+    cgroups = handler;
+  }
+
+  /**
+   * Kill the container, if it has exceeded its request.
+   *
+   * @param container Container to check
+   * @param fileName  CGroup filename (physical or swap/virtual)
+   * @return true, if the container was preempted
+   */
+  private boolean killContainerIfOOM(Container container, String fileName) {
+    String value = null;
+    try {
+      value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+          container.getContainerId().toString(),
+          fileName);
+      long usage = Long.parseLong(value);
+      long request = container.getResource().getMemorySize() * 1024 * 1024;
+
+      // Check if the container has exceeded its limits.
+      if (usage > request) {
+        // Kill the container
+        // We could call the regular cleanup but that sends a
+        // SIGTERM first that cannot be handled by frozen processes.
+        // Walk through the cgroup
+        // tasks file and kill all processes in it
+        sigKill(container);
+        String message = String.format(
+            "Container %s was killed by elastic cgroups OOM handler using %d " +
+                "when requested only %d",
+            container.getContainerId(), usage, request);
+        LOG.warn(message);
+        return true;
+      }
+    } catch (ResourceHandlerException ex) {
+      LOG.warn(String.format("Could not access memory resource for %s",
+          container.getContainerId()), ex);
+    } catch (NumberFormatException ex) {
+      LOG.warn(String.format("Could not parse %s in %s",
+          value, container.getContainerId()));
+    }
+    return false;
+  }
+
+  /**
+   * SIGKILL the specified container. We do this not using the standard
+   * container logic. The reason is that the processes are frozen by
+   * the cgroups OOM handler, so they cannot respond to SIGTERM.
+   * On the other hand we have to be as fast as possible.
+   * We walk through the list of active processes in the container.
+   * This is needed because frozen parents cannot signal their children.
+   * We kill each process and then try again until the whole cgroup
+   * is cleaned up. This logic avoids leaking processes in a cgroup.
+   * Currently the killing only succeeds for PGIDS.
+   *
+   * @param container Container to clean up
+   */
+  private void sigKill(Container container) {
+    boolean finished = false;
+    try {
+      while (!finished) {
+        String[] pids =
+            cgroups.getCGroupParam(
+                CGroupsHandler.CGroupController.MEMORY,
+                container.getContainerId().toString(),
+                CGROUP_FILE_TASKS)
+                .split("\n");
+        finished = true;
+        for (String pid : pids) {
+          // Note: this kills only PGIDs currently
+          if (pid != null && !pid.isEmpty()) {
+            LOG.debug(String.format(
+                "Terminating container %s Sending SIGKILL to -%s",
+                container.getContainerId().toString(),
+                pid));
+            finished = false;
+            try {
+              context.getContainerExecutor().signalContainer(
+                  new ContainerSignalContext.Builder().setContainer(container)
+                      .setUser(container.getUser())
+                      .setPid(pid).setSignal(ContainerExecutor.Signal.KILL)
+                      .build());
+            } catch (IOException ex) {
+              LOG.warn(String.format("Cannot kill container %s pid -%s.",
+                  container.getContainerId(), pid), ex);
+            }
+          }
+        }
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted while waiting for processes to disappear");
+        }
+      }
+    } catch (ResourceHandlerException ex) {
+      LOG.warn(String.format(
+          "Cannot list more tasks in container %s to kill.",
+          container.getContainerId()));
+    }
+  }
+
+  /**
+   * It is called when the node is under an OOM condition. All processes in
+   * all sub-cgroups are suspended. We need to act fast, so that we do not
+   * affect the overall system utilization.
+   * In general we try to find a newly run container that exceeded its limits.
+   * The justification is cost, since probably this is the one that has
+   * accumulated the least amount of uncommitted data so far.
+   * We continue the process until the OOM is resolved.
+   */
+  @Override
+  public void run() {
+    try {
+      // Reverse order by start time
+      Comparator<Container> comparator = (Container o1, Container o2) -> {
+        long order = o1.getContainerStartTime() - o2.getContainerStartTime();
+        return order > 0 ? -1 : order < 0 ? 1 : 0;
+      };
+
+      // We kill containers until the kernel reports the OOM situation resolved
+      // Note: If the kernel has a delay this may kill more than necessary
+      while (true) {
+        String status = cgroups.getCGroupParam(
+            CGroupsHandler.CGroupController.MEMORY,
+            "",
+            CGROUP_PARAM_MEMORY_OOM_CONTROL);
+        if (!status.contains(CGroupsHandler.UNDER_OOM)) {
+          break;
+        }
+
+        // The first pass kills a recent container
+        // that uses more than its request
+        ArrayList<Container> containers = new ArrayList<>();
+        containers.addAll(context.getContainers().values());
+        // Note: Sorting may take a long time with 10K+ containers
+        // but it is acceptable now with low number of containers per node
+        containers.sort(comparator);
+
+        // Kill the latest container that exceeded its request
+        boolean found = false;
+        for (Container container : containers) {
+          if (!virtual) {
+            if (killContainerIfOOM(container,
+                CGROUP_PARAM_MEMORY_USAGE_BYTES)) {
+              found = true;
+              break;
+            }
+          } else {
+            if (killContainerIfOOM(container,
+                CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) {
+              found = true;
+              break;
+            }
+          }
+        }
+        if (found) {
+          continue;
+        }
+
+        // We have not found any containers that ran out of their limit,
+        // so we will kill the latest one. This can happen, if all use
+        // close to their request and one of them requests a big block
+        // triggering the OOM freeze.
+        // Currently there is no other way to identify the outstanding one.
+        if (containers.size() > 0) {
+          Container container = containers.get(0);
+          sigKill(container);
+          String message = String.format(
+              "Newest container %s killed by elastic cgroups OOM handler using",
+              container.getContainerId());
+          LOG.warn(message);
+          continue;
+        }
+
+        // This can happen, if SIGKILL did not clean up
+        // non-PGID or containers or containers launched by other users
+        // or if a process was put to the root YARN cgroup.
+        throw new YarnRuntimeException(
+            "Could not find any containers but CGroups " +
+                "reserved for containers ran out of memory. " +
+                "I am giving up");
+      }
+    } catch (ResourceHandlerException ex) {
+      LOG.warn("Could not fecth OOM status. " +
+          "This is expected at shutdown. Exiting.", ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 35015c2..bd68dfe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +67,7 @@ public class ContainersMonitorImpl extends AbstractService implements
 
   private long monitoringInterval;
   private MonitoringThread monitoringThread;
+  private CGroupElasticMemoryController oomListenerThread;
   private boolean containerMetricsEnabled;
   private long containerMetricsPeriodMs;
   private long containerMetricsUnregisterDelayMs;
@@ -85,6 +89,8 @@ public class ContainersMonitorImpl extends AbstractService implements
 
   private boolean pmemCheckEnabled;
   private boolean vmemCheckEnabled;
+  private boolean elasticMemoryEnforcement;
+  private boolean strictMemoryEnforcement;
   private boolean containersMonitorEnabled;
 
   private long maxVCoresAllottedForContainers;
@@ -173,8 +179,37 @@ public class ContainersMonitorImpl extends AbstractService implements
     vmemCheckEnabled = this.conf.getBoolean(
         YarnConfiguration.NM_VMEM_CHECK_ENABLED,
         YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
+    elasticMemoryEnforcement = this.conf.getBoolean(
+        YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED,
+        YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED);
+    strictMemoryEnforcement = conf.getBoolean(
+        YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,
+        YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED);
     LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
     LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
+    LOG.info("Elastic memory control enabled: " + elasticMemoryEnforcement);
+    LOG.info("Strict memory control enabled: " + strictMemoryEnforcement);
+
+    if (elasticMemoryEnforcement) {
+      if (!CGroupElasticMemoryController.isAvailable()) {
+        // Test for availability outside the constructor
+        // to be able to write non-Linux unit tests for
+        // CGroupElasticMemoryController
+        throw new YarnException(
+            "CGroup Elastic Memory controller enabled but " +
+            "it is not available. Exiting.");
+      } else {
+        this.oomListenerThread = new CGroupElasticMemoryController(
+            conf,
+            context,
+            ResourceHandlerModule.getCGroupsHandler(),
+            pmemCheckEnabled,
+            vmemCheckEnabled,
+            pmemCheckEnabled ?
+                maxPmemAllottedForContainers : maxVmemAllottedForContainers
+        );
+      }
+    }
 
     containersMonitorEnabled =
         isContainerMonitorEnabled() && monitoringInterval > 0;
@@ -246,6 +281,9 @@ public class ContainersMonitorImpl extends AbstractService implements
     if (containersMonitorEnabled) {
       this.monitoringThread.start();
     }
+    if (oomListenerThread != null) {
+      oomListenerThread.start();
+    }
     super.serviceStart();
   }
 
@@ -259,6 +297,14 @@ public class ContainersMonitorImpl extends AbstractService implements
       } catch (InterruptedException e) {
         LOG.info("ContainersMonitorImpl monitoring thread interrupted");
       }
+      if (this.oomListenerThread != null) {
+        this.oomListenerThread.stopListening();
+        try {
+          this.oomListenerThread.join();
+        } finally {
+          this.oomListenerThread = null;
+        }
+      }
     }
     super.serviceStop();
   }
@@ -651,6 +697,10 @@ public class ContainersMonitorImpl extends AbstractService implements
                             ProcessTreeInfo ptInfo,
                             long currentVmemUsage,
                             long currentPmemUsage) {
+      if (elasticMemoryEnforcement || strictMemoryEnforcement) {
+        // We enforce the overall memory usage instead of individual containers
+        return;
+      }
       boolean isMemoryOverLimit = false;
       long vmemLimit = ptInfo.getVmemLimit();
       long pmemLimit = ptInfo.getPmemLimit();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
index 56b571b..5b911b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
@@ -20,6 +20,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.executor;
 
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
@@ -93,4 +94,44 @@ public final class ContainerSignalContext {
   public Signal getSignal() {
     return this.signal;
   }
+
+  /**
+   * Retrun true if we are trying to signal the same process.
+   * @param obj compare to this object
+   * @return whether we try to signal the same process id
+   */
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ContainerSignalContext) {
+      ContainerSignalContext other = (ContainerSignalContext)obj;
+      boolean ret =
+          (other.getPid() == null && getPid() == null) ||
+              (other.getPid() != null && getPid() != null &&
+                  other.getPid().equals(getPid()));
+      ret = ret &&
+          (other.getSignal() == null && getSignal() == null) ||
+          (other.getSignal() != null && getSignal() != null &&
+              other.getSignal().equals(getSignal()));
+      ret = ret &&
+          (other.getContainer() == null && getContainer() == null) ||
+          (other.getContainer() != null && getContainer() != null &&
+              other.getContainer().equals(getContainer()));
+      ret = ret &&
+          (other.getUser() == null && getUser() == null) ||
+          (other.getUser() != null && getUser() != null &&
+              other.getUser().equals(getUser()));
+      return ret;
+    }
+    return super.equals(obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().
+        append(getPid()).
+        append(getSignal()).
+        append(getContainer()).
+        append(getUser()).
+        toHashCode();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c
new file mode 100644
index 0000000..0086b26
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+#include <sys/param.h>
+#include <poll.h>
+#include "oom_listener.h"
+
+/*
+ * Print an error.
+*/
+static inline void print_error(const char *file, const char *message,
+                        ...) {
+  fprintf(stderr, "%s ", file);
+  va_list arguments;
+  va_start(arguments, message);
+  vfprintf(stderr, message, arguments);
+  va_end(arguments);
+}
+
+/*
+ * Listen to OOM events in a memory cgroup. See declaration for details.
+ */
+int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd) {
+  const char *pattern =
+          cgroup[MAX(strlen(cgroup), 1) - 1] == '/'
+          ? "%s%s" :"%s/%s";
+
+  /* Create an event handle, if we do not have one already*/
+  if (descriptors->event_fd == -1 &&
+      (descriptors->event_fd = eventfd(0, 0)) == -1) {
+    print_error(descriptors->command, "eventfd() failed. errno:%d %s\n",
+                errno, strerror(errno));
+    return EXIT_FAILURE;
+  }
+
+  /*
+   * open the file to listen to (memory.oom_control)
+   * and write the event handle and the file handle
+   * to cgroup.event_control
+   */
+  if (snprintf(descriptors->event_control_path,
+               sizeof(descriptors->event_control_path),
+               pattern,
+               cgroup,
+               "cgroup.event_control") < 0) {
+    print_error(descriptors->command, "path too long %s\n", cgroup);
+    return EXIT_FAILURE;
+  }
+
+  if ((descriptors->event_control_fd = open(
+      descriptors->event_control_path,
+      O_WRONLY|O_CREAT, 0600)) == -1) {
+    print_error(descriptors->command, "Could not open %s. errno:%d %s\n",
+                descriptors->event_control_path,
+                errno, strerror(errno));
+    return EXIT_FAILURE;
+  }
+
+  if (snprintf(descriptors->oom_control_path,
+               sizeof(descriptors->oom_control_path),
+               pattern,
+               cgroup,
+               "memory.oom_control") < 0) {
+    print_error(descriptors->command, "path too long %s\n", cgroup);
+    return EXIT_FAILURE;
+  }
+
+  if ((descriptors->oom_control_fd = open(
+      descriptors->oom_control_path,
+      O_RDONLY)) == -1) {
+    print_error(descriptors->command, "Could not open %s. errno:%d %s\n",
+                descriptors->oom_control_path,
+                errno, strerror(errno));
+    return EXIT_FAILURE;
+  }
+
+  if ((descriptors->oom_command_len = (size_t) snprintf(
+      descriptors->oom_command,
+      sizeof(descriptors->oom_command),
+      "%d %d",
+      descriptors->event_fd,
+      descriptors->oom_control_fd)) < 0) {
+    print_error(descriptors->command, "Could print %d %d\n",
+                descriptors->event_control_fd,
+                descriptors->oom_control_fd);
+    return EXIT_FAILURE;
+  }
+
+  if (write(descriptors->event_control_fd,
+            descriptors->oom_command,
+            descriptors->oom_command_len) == -1) {
+    print_error(descriptors->command, "Could not write to %s errno:%d\n",
+                descriptors->event_control_path, errno);
+    return EXIT_FAILURE;
+  }
+
+  if (close(descriptors->event_control_fd) == -1) {
+    print_error(descriptors->command, "Could not close %s errno:%d\n",
+                descriptors->event_control_path, errno);
+    return EXIT_FAILURE;
+  }
+  descriptors->event_control_fd = -1;
+
+  /*
+   * Listen to events as long as the cgroup exists
+   * and forward them to the fd in the argument.
+   */
+  for (;;) {
+    uint64_t u;
+    ssize_t ret = 0;
+    struct stat stat_buffer = {0};
+    struct pollfd poll_fd = {
+        .fd = descriptors->event_fd,
+        .events = POLLIN
+    };
+
+    ret = poll(&poll_fd, 1, descriptors->watch_timeout);
+    if (ret < 0) {
+      /* Error calling poll */
+      print_error(descriptors->command,
+                  "Could not poll eventfd %d errno:%d %s\n", ret,
+                  errno, strerror(errno));
+      return EXIT_FAILURE;
+    }
+
+    if (ret > 0) {
+      /* Event counter values are always 8 bytes */
+      if ((ret = read(descriptors->event_fd, &u, sizeof(u))) != sizeof(u)) {
+        print_error(descriptors->command,
+                    "Could not read from eventfd %d errno:%d %s\n", ret,
+                    errno, strerror(errno));
+        return EXIT_FAILURE;
+      }
+
+      /* Forward the value to the caller, typically stdout */
+      if ((ret = write(fd, &u, sizeof(u))) != sizeof(u)) {
+        print_error(descriptors->command,
+                    "Could not write to pipe %d errno:%d %s\n", ret,
+                    errno, strerror(errno));
+        return EXIT_FAILURE;
+      }
+    } else if (ret == 0) {
+      /* Timeout has elapsed*/
+
+      /* Quit, if the cgroup is deleted */
+      if (stat(cgroup, &stat_buffer) != 0) {
+        break;
+      }
+    }
+  }
+  return EXIT_SUCCESS;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h
new file mode 100644
index 0000000..aa77cb6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <sys/eventfd.h>
+#include <sys/stat.h>
+
+#include <linux/limits.h>
+
+/*
+This file implements a standard cgroups out of memory listener.
+*/
+
+typedef struct _oom_listener_descriptors {
+  /*
+   * Command line that was called to run this process.
+   */
+  const char *command;
+  /*
+   * Event descriptor to watch.
+   * It is filled in by the function,
+   * if not specified, yet.
+   */
+  int event_fd;
+  /*
+   * cgroup.event_control file handle
+   */
+  int event_control_fd;
+  /*
+   * memory.oom_control file handle
+   */
+  int oom_control_fd;
+  /*
+   * cgroup.event_control path
+   */
+  char event_control_path[PATH_MAX];
+  /*
+   * memory.oom_control path
+   */
+  char oom_control_path[PATH_MAX];
+  /*
+   * Control command to write to
+   * cgroup.event_control
+   * Filled by the function.
+   */
+  char oom_command[25];
+  /*
+   * Length of oom_command filled by the function.
+   */
+  size_t oom_command_len;
+  /*
+   * Directory watch timeout
+   */
+  int watch_timeout;
+} _oom_listener_descriptors;
+
+/*
+ Clean up allocated resources in a descriptor structure
+*/
+inline void cleanup(_oom_listener_descriptors *descriptors) {
+  close(descriptors->event_fd);
+  descriptors->event_fd = -1;
+  close(descriptors->event_control_fd);
+  descriptors->event_control_fd = -1;
+  close(descriptors->oom_control_fd);
+  descriptors->oom_control_fd = -1;
+  descriptors->watch_timeout = 1000;
+}
+
+/*
+ * Enable an OOM listener on the memory cgroup cgroup
+ * descriptors: Structure that holds state for testing purposes
+ * cgroup: cgroup path to watch. It has to be a memory cgroup
+ * fd: File to forward events to. Normally this is stdout
+ */
+int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd);
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c
new file mode 100644
index 0000000..eb7fc3e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+
+#include "oom_listener.h"
+
+void print_usage(void) {
+  fprintf(stderr, "oom-listener");
+  fprintf(stderr, "Listen to OOM events in a cgroup");
+  fprintf(stderr, "usage to listen: oom-listener <cgroup directory>\n");
+  fprintf(stderr, "usage to test: oom-listener oom [<pgid>]\n");
+  fprintf(stderr, "example listening: oom-listener /sys/fs/cgroup/memory/hadoop-yarn | xxd -c 8\n");
+  fprintf(stderr, "example oom to test: bash -c 'echo $$ >/sys/fs/cgroup/memory/hadoop-yarn/tasks;oom-listener oom'\n");
+  fprintf(stderr, "example container overload: sudo -u <user> bash -c 'echo $$ && oom-listener oom 0' >/sys/fs/cgroup/memory/hadoop-yarn/<container>/tasks\n");
+  exit(EXIT_FAILURE);
+}
+
+/*
+  Test an OOM situation adding the pid
+  to the group pgid and calling malloc in a loop
+  This can be used to test OOM listener. See examples above.
+*/
+void test_oom_infinite(char* pgids) {
+  if (pgids != NULL) {
+    int pgid = atoi(pgids);
+    setpgid(0, pgid);
+  }
+  while(1) {
+    char* p = (char*)malloc(4096);
+    if (p != NULL) {
+      p[0] = 0xFF;
+    } else {
+      exit(1);
+    }
+  }
+}
+
+/*
+ A command that receives a memory cgroup directory and
+ listens to the events in the directory.
+ It will print a new line on every out of memory event
+ to the standard output.
+ usage:
+ oom-listener <cgroup>
+*/
+int main(int argc, char *argv[]) {
+  if (argc >= 2 &&
+      strcmp(argv[1], "oom") == 0)
+    test_oom_infinite(argc < 3 ? NULL : argv[2]);
+
+  if (argc != 2)
+    print_usage();
+
+  _oom_listener_descriptors descriptors = {
+      .command = argv[0],
+      .event_fd = -1,
+      .event_control_fd = -1,
+      .oom_control_fd = -1,
+      .event_control_path = {0},
+      .oom_control_path = {0},
+      .oom_command = {0},
+      .oom_command_len = 0,
+      .watch_timeout = 1000
+  };
+
+  int ret = oom_listener(&descriptors, argv[1], STDOUT_FILENO);
+
+  cleanup(&descriptors);
+
+  return ret;
+}
+
+#else
+
+/*
+ This tool uses Linux specific functionality,
+ so it is not available for other operating systems
+*/
+int main() {
+  return 1;
+}
+
+#endif
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc
new file mode 100644
index 0000000..9627632
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+extern "C" {
+#include "oom_listener.h"
+}
+
+#include <gtest/gtest.h>
+#include <fstream>
+#include <mutex>
+
+#define CGROUP_ROOT "/sys/fs/cgroup/memory/"
+#define TEST_ROOT "/tmp/test-oom-listener/"
+#define CGROUP_TASKS "tasks"
+#define CGROUP_OOM_CONTROL "memory.oom_control"
+#define CGROUP_LIMIT_PHYSICAL "memory.limit_in_bytes"
+#define CGROUP_LIMIT_SWAP "memory.memsw.limit_in_bytes"
+#define CGROUP_EVENT_CONTROL "cgroup.event_control"
+#define CGROUP_LIMIT (5 * 1024 * 1024)
+
+// We try multiple cgroup directories
+// We try first the official path to test
+// in production
+// If we are running as a user we fall back
+// to mock cgroup
+static const char *cgroup_candidates[] = { CGROUP_ROOT, TEST_ROOT };
+
+int main(int argc, char **argv) {
+  testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
+
+class OOMListenerTest : public ::testing::Test {
+private:
+  char cgroup[PATH_MAX] = {};
+  const char* cgroup_root = nullptr;
+public:
+  OOMListenerTest() = default;
+
+  virtual ~OOMListenerTest() = default;
+  virtual const char* GetCGroup() { return cgroup; }
+  virtual void SetUp() {
+    struct stat cgroup_memory = {};
+    for (unsigned int i = 0; i < GTEST_ARRAY_SIZE_(cgroup_candidates); ++i) {
+      cgroup_root = cgroup_candidates[i];
+
+      // Try to create the root.
+      // We might not have permission and
+      // it may already exist
+      mkdir(cgroup_root, 0700);
+
+      if (0 != stat(cgroup_root, &cgroup_memory)) {
+        printf("%s missing. Skipping test\n", cgroup_root);
+        continue;
+      }
+
+      timespec timespec1 = {};
+      if (0 != clock_gettime(CLOCK_MONOTONIC, &timespec1)) {
+        ASSERT_TRUE(false) << " clock_gettime failed\n";
+      }
+
+      if (snprintf(cgroup, sizeof(cgroup), "%s%lx/",
+                        cgroup_root, timespec1.tv_nsec) <= 0) {
+        cgroup[0] = '\0';
+        printf("%s snprintf failed\n", cgroup_root);
+        continue;
+      }
+
+      // Create a cgroup named the current timestamp
+      // to make it quasi unique
+      if (0 != mkdir(cgroup, 0700)) {
+        printf("%s not writable.\n", cgroup);
+        continue;
+      }
+      break;
+    }
+
+    ASSERT_EQ(0, stat(cgroup, &cgroup_memory))
+                  << "Cannot use or simulate cgroup " << cgroup;
+  }
+  virtual void TearDown() {
+    if (cgroup[0] != '\0') {
+      rmdir(cgroup);
+    }
+    if (cgroup_root != nullptr &&
+        cgroup_root != cgroup_candidates[0]) {
+      rmdir(cgroup_root);
+    }
+  }
+};
+
+/*
+  Unit test for cgroup testing. There are two modes.
+  If the unit test is run as root and we have cgroups
+  we try to crate a cgroup and generate an OOM.
+  If we are not running as root we just sleep instead of
+  hogging memory and simulate the OOM by sending
+  an event in a mock event fd mock_oom_event_as_user.
+*/
+TEST_F(OOMListenerTest, test_oom) {
+  // Disable OOM killer
+  std::ofstream oom_control;
+  std::string oom_control_file =
+      std::string(GetCGroup()).append(CGROUP_OOM_CONTROL);
+  oom_control.open(oom_control_file.c_str(), oom_control.out);
+  oom_control << 1 << std::endl;
+  oom_control.close();
+
+  // Set a low enough limit for physical
+  std::ofstream limit;
+  std::string limit_file =
+      std::string(GetCGroup()).append(CGROUP_LIMIT_PHYSICAL);
+  limit.open(limit_file.c_str(), limit.out);
+  limit << CGROUP_LIMIT << std::endl;
+  limit.close();
+
+  // Set a low enough limit for physical + swap
+  std::ofstream limitSwap;
+  std::string limit_swap_file =
+      std::string(GetCGroup()).append(CGROUP_LIMIT_SWAP);
+  limitSwap.open(limit_swap_file.c_str(), limitSwap.out);
+  limitSwap << CGROUP_LIMIT << std::endl;
+  limitSwap.close();
+
+  // Event control file to set
+  std::string memory_control_file =
+      std::string(GetCGroup()).append(CGROUP_EVENT_CONTROL);
+
+  // Tasks file to check
+  std::string tasks_file =
+      std::string(GetCGroup()).append(CGROUP_TASKS);
+
+  int mock_oom_event_as_user = -1;
+  struct stat stat1 = {};
+  if (0 != stat(memory_control_file.c_str(), &stat1)) {
+    // We cannot tamper with cgroups
+    // running as a user, so simulate an
+    // oom event
+    mock_oom_event_as_user = eventfd(0, 0);
+  }
+  const int simulate_cgroups =
+      mock_oom_event_as_user != -1;
+
+  __pid_t mem_hog_pid = fork();
+  if (!mem_hog_pid) {
+    // Child process to consume too much memory
+    if (simulate_cgroups) {
+      std::cout << "Simulating cgroups OOM" << std::endl;
+      for (;;) {
+        sleep(1);
+      }
+    } else {
+      // Wait until we are added to the cgroup
+      // so that it is accounted for our mem
+      // usage
+      __pid_t cgroupPid;
+      do {
+        std::ifstream tasks;
+        tasks.open(tasks_file.c_str(), tasks.in);
+        tasks >> cgroupPid;
+        tasks.close();
+      } while (cgroupPid != getpid());
+
+      // Start consuming as much memory as we can.
+      // cgroup will stop us at CGROUP_LIMIT
+      const int bufferSize = 1024 * 1024;
+      std::cout << "Consuming too much memory" << std::endl;
+      for (;;) {
+        auto buffer = (char *) malloc(bufferSize);
+        if (buffer != nullptr) {
+          for (int i = 0; i < bufferSize; ++i) {
+            buffer[i] = (char) std::rand();
+          }
+        }
+      }
+    }
+  } else {
+    // Parent test
+    ASSERT_GE(mem_hog_pid, 1) << "Fork failed " << errno;
+
+    // Put child into cgroup
+    std::ofstream tasks;
+    tasks.open(tasks_file.c_str(), tasks.out);
+    tasks << mem_hog_pid << std::endl;
+    tasks.close();
+
+    // Create pipe to get forwarded eventfd
+    int test_pipe[2];
+    ASSERT_EQ(0, pipe(test_pipe));
+
+    // Launch OOM listener
+    // There is no race condition with the process
+    // running out of memory. If oom is 1 at startup
+    // oom_listener will send an initial notification
+    __pid_t listener = fork();
+    if (listener == 0) {
+      // child listener forwarding cgroup events
+      _oom_listener_descriptors descriptors = {
+          .command = "test",
+          .event_fd = mock_oom_event_as_user,
+          .event_control_fd = -1,
+          .oom_control_fd = -1,
+          .event_control_path = {0},
+          .oom_control_path = {0},
+          .oom_command = {0},
+          .oom_command_len = 0,
+          .watch_timeout = 100
+      };
+      int ret = oom_listener(&descriptors, GetCGroup(), test_pipe[1]);
+      cleanup(&descriptors);
+      close(test_pipe[0]);
+      close(test_pipe[1]);
+      exit(ret);
+    } else {
+    // Parent test
+      uint64_t event_id = 1;
+      if (simulate_cgroups) {
+        // We cannot tamper with cgroups
+        // running as a user, so simulate an
+        // oom event
+        ASSERT_EQ(sizeof(event_id),
+                  write(mock_oom_event_as_user,
+                        &event_id,
+                        sizeof(event_id)));
+      }
+      ASSERT_EQ(sizeof(event_id),
+                read(test_pipe[0],
+                     &event_id,
+                     sizeof(event_id)))
+                    << "The event has not arrived";
+      close(test_pipe[0]);
+      close(test_pipe[1]);
+
+      // Simulate OOM killer
+      ASSERT_EQ(0, kill(mem_hog_pid, SIGKILL));
+
+      // Verify that process was killed
+      __WAIT_STATUS mem_hog_status = {};
+      __pid_t exited0 = wait(mem_hog_status);
+      ASSERT_EQ(mem_hog_pid, exited0)
+        << "Wrong process exited";
+      ASSERT_EQ(nullptr, mem_hog_status)
+        << "Test process killed with invalid status";
+
+      if (mock_oom_event_as_user != -1) {
+        ASSERT_EQ(0, unlink(oom_control_file.c_str()));
+        ASSERT_EQ(0, unlink(limit_file.c_str()));
+        ASSERT_EQ(0, unlink(limit_swap_file.c_str()));
+        ASSERT_EQ(0, unlink(tasks_file.c_str()));
+        ASSERT_EQ(0, unlink(memory_control_file.c_str()));
+      }
+      // Once the cgroup is empty delete it
+      ASSERT_EQ(0, rmdir(GetCGroup()))
+                << "Could not delete cgroup " << GetCGroup();
+
+      // Check that oom_listener exited on the deletion of the cgroup
+      __WAIT_STATUS oom_listener_status = {};
+      __pid_t exited1 = wait(oom_listener_status);
+      ASSERT_EQ(listener, exited1)
+        << "Wrong process exited";
+      ASSERT_EQ(nullptr, oom_listener_status)
+        << "Listener process exited with invalid status";
+    }
+  }
+}
+
+#else
+/*
+This tool covers Linux specific functionality,
+so it is not available for other operating systems
+*/
+int main() {
+  return 1;
+}
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java
new file mode 100644
index 0000000..54bcb13
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Runnable that does not do anything.
+ */
+public class DummyRunnableWithContext implements Runnable {
+  public DummyRunnableWithContext(Context context, boolean virtual) {
+  }
+  @Override
+  public void run() {
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org