You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/23 23:35:18 UTC

[1/2] hadoop git commit: YARN-4599. Set OOM control for memory cgroups. (Miklos Szegedi via Haibo Chen)

Repository: hadoop
Updated Branches:
  refs/heads/trunk f09dc7300 -> d99647995


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupElasticMemoryController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupElasticMemoryController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupElasticMemoryController.java
new file mode 100644
index 0000000..118d172
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupElasticMemoryController.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for elastic non-strict memory controller based on cgroups.
+ */
+public class TestCGroupElasticMemoryController {
+  private YarnConfiguration conf = new YarnConfiguration();
+  private File script = new File("target/" +
+      TestCGroupElasticMemoryController.class.getName());
+
+  /**
+   * Test that at least one memory type is requested.
+   * @throws YarnException on exception
+   */
+  @Test(expected = YarnException.class)
+  public void testConstructorOff()
+      throws YarnException {
+    CGroupElasticMemoryController controller =
+        new CGroupElasticMemoryController(
+            conf,
+            null,
+            null,
+            false,
+            false,
+            10000
+        );
+  }
+
+  /**
+   * Test that the OOM logic is pluggable.
+   * @throws YarnException on exception
+   */
+  @Test
+  public void testConstructorHandler()
+      throws YarnException {
+    conf.setClass(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER,
+        DummyRunnableWithContext.class, Runnable.class);
+    CGroupsHandler handler = mock(CGroupsHandler.class);
+    when(handler.getPathForCGroup(any(), any())).thenReturn("");
+    CGroupElasticMemoryController controller =
+        new CGroupElasticMemoryController(
+            conf,
+            null,
+            handler,
+            true,
+            false,
+            10000
+        );
+  }
+
+  /**
+   * Test that the handler is notified about multiple OOM events.
+   * @throws Exception on exception
+   */
+  @Test
+  public void testMultipleOOMEvents() throws Exception {
+    conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
+        script.getAbsolutePath());
+    try {
+      FileUtils.writeStringToFile(script,
+          "#!/bin/bash\nprintf oomevent;printf oomevent;\n",
+          Charset.defaultCharset(), false);
+      assertTrue("Could not set executable",
+          script.setExecutable(true));
+
+      CGroupsHandler cgroups = mock(CGroupsHandler.class);
+      when(cgroups.getPathForCGroup(any(), any())).thenReturn("");
+      when(cgroups.getCGroupParam(any(), any(), any()))
+          .thenReturn("under_oom 0");
+
+      Runnable handler = mock(Runnable.class);
+      doNothing().when(handler).run();
+
+      CGroupElasticMemoryController controller =
+          new CGroupElasticMemoryController(
+              conf,
+              null,
+              cgroups,
+              true,
+              false,
+              10000,
+              handler
+          );
+      controller.run();
+      verify(handler, times(2)).run();
+    } finally {
+      assertTrue(String.format("Could not clean up script %s",
+          script.getAbsolutePath()), script.delete());
+    }
+  }
+
+  /**
+   * Test the scenario that the controller is stopped before.
+   * the child process starts
+   * @throws Exception one exception
+   */
+  @Test
+  public void testStopBeforeStart() throws Exception {
+    conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
+        script.getAbsolutePath());
+    try {
+      FileUtils.writeStringToFile(script,
+          "#!/bin/bash\nprintf oomevent;printf oomevent;\n",
+          Charset.defaultCharset(), false);
+      assertTrue("Could not set executable",
+          script.setExecutable(true));
+
+      CGroupsHandler cgroups = mock(CGroupsHandler.class);
+      when(cgroups.getPathForCGroup(any(), any())).thenReturn("");
+      when(cgroups.getCGroupParam(any(), any(), any()))
+          .thenReturn("under_oom 0");
+
+      Runnable handler = mock(Runnable.class);
+      doNothing().when(handler).run();
+
+      CGroupElasticMemoryController controller =
+          new CGroupElasticMemoryController(
+              conf,
+              null,
+              cgroups,
+              true,
+              false,
+              10000,
+              handler
+          );
+      controller.stopListening();
+      controller.run();
+      verify(handler, times(0)).run();
+    } finally {
+      assertTrue(String.format("Could not clean up script %s",
+          script.getAbsolutePath()), script.delete());
+    }
+  }
+
+  /**
+   * Test the edge case that OOM is never resolved.
+   * @throws Exception on exception
+   */
+  @Test(expected = YarnRuntimeException.class)
+  public void testInfiniteOOM() throws Exception {
+    conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
+        script.getAbsolutePath());
+    Runnable handler = mock(Runnable.class);
+    try {
+      FileUtils.writeStringToFile(script,
+          "#!/bin/bash\nprintf oomevent;sleep 1000;\n",
+          Charset.defaultCharset(), false);
+      assertTrue("Could not set executable",
+          script.setExecutable(true));
+
+      CGroupsHandler cgroups = mock(CGroupsHandler.class);
+      when(cgroups.getPathForCGroup(any(), any())).thenReturn("");
+      when(cgroups.getCGroupParam(any(), any(), any()))
+          .thenReturn("under_oom 1");
+
+      doNothing().when(handler).run();
+
+      CGroupElasticMemoryController controller =
+          new CGroupElasticMemoryController(
+              conf,
+              null,
+              cgroups,
+              true,
+              false,
+              10000,
+              handler
+          );
+      controller.run();
+    } finally {
+      verify(handler, times(1)).run();
+      assertTrue(String.format("Could not clean up script %s",
+          script.getAbsolutePath()), script.delete());
+    }
+  }
+
+  /**
+   * Test the edge case that OOM cannot be resolved due to the lack of
+   * containers.
+   * @throws Exception on exception
+   */
+  @Test(expected = YarnRuntimeException.class)
+  public void testNothingToKill() throws Exception {
+    conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
+        script.getAbsolutePath());
+    Runnable handler = mock(Runnable.class);
+    try {
+      FileUtils.writeStringToFile(script,
+          "#!/bin/bash\nprintf oomevent;sleep 1000;\n",
+          Charset.defaultCharset(), false);
+      assertTrue("Could not set executable",
+          script.setExecutable(true));
+
+      CGroupsHandler cgroups = mock(CGroupsHandler.class);
+      when(cgroups.getPathForCGroup(any(), any())).thenReturn("");
+      when(cgroups.getCGroupParam(any(), any(), any()))
+          .thenReturn("under_oom 1");
+
+      doThrow(new YarnRuntimeException("Expected")).when(handler).run();
+
+      CGroupElasticMemoryController controller =
+          new CGroupElasticMemoryController(
+              conf,
+              null,
+              cgroups,
+              true,
+              false,
+              10000,
+              handler
+          );
+      controller.run();
+    } finally {
+      verify(handler, times(1)).run();
+      assertTrue(String.format("Could not clean up script %s",
+          script.getAbsolutePath()), script.delete());
+    }
+  }
+
+  /**
+   * Test that node manager can exit listening.
+   * This is done by running a long running listener for 10 seconds.
+   * Then we wait for 2 seconds and stop listening.
+   * @throws Exception exception occurred
+   */
+  @Test
+  public void testNormalExit() throws Exception {
+    conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
+        script.getAbsolutePath());
+    try {
+      FileUtils.writeStringToFile(script,
+          "#!/bin/bash\nsleep 10000;\n",
+          Charset.defaultCharset(), false);
+      assertTrue("Could not set executable",
+          script.setExecutable(true));
+
+      CGroupsHandler cgroups = mock(CGroupsHandler.class);
+      when(cgroups.getPathForCGroup(any(), any())).thenReturn("");
+      when(cgroups.getCGroupParam(any(), any(), any()))
+          .thenReturn("under_oom 0");
+
+      Runnable handler = mock(Runnable.class);
+      doNothing().when(handler).run();
+
+      CGroupElasticMemoryController controller =
+          new CGroupElasticMemoryController(
+              conf,
+              null,
+              cgroups,
+              true,
+              false,
+              10000,
+              handler
+          );
+      ExecutorService service = Executors.newFixedThreadPool(1);
+      service.submit(() -> {
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException ex) {
+          assertTrue("Wait interrupted.", false);
+        }
+        controller.stopListening();
+      });
+      controller.run();
+    } finally {
+      assertTrue(String.format("Could not clean up script %s",
+          script.getAbsolutePath()), script.delete());
+    }
+  }
+
+  /**
+   * Test that DefaultOOMHandler is instantiated correctly in
+   * the elastic constructor.
+   * @throws YarnException Could not set up elastic memory control.
+   */
+  @Test
+  public void testDefaultConstructor() throws YarnException{
+    CGroupsHandler handler = mock(CGroupsHandler.class);
+    when(handler.getPathForCGroup(any(), any())).thenReturn("");
+    CGroupElasticMemoryController controller =
+        new CGroupElasticMemoryController(
+            conf, null, handler, true, false, 10);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java
index 416b4fd..5c7e233 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java
@@ -65,17 +65,15 @@ public class TestCGroupsMemoryResourceHandlerImpl {
     conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
     try {
       cGroupsMemoryResourceHandler.bootstrap(conf);
-      Assert.fail("Pmem check should not be allowed to run with cgroups");
     } catch(ResourceHandlerException re) {
-      // do nothing
+      Assert.fail("Pmem check should be allowed to run with cgroups");
     }
     conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
     conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
     try {
       cGroupsMemoryResourceHandler.bootstrap(conf);
-      Assert.fail("Vmem check should not be allowed to run with cgroups");
     } catch(ResourceHandlerException re) {
-      // do nothing
+      Assert.fail("Vmem check should be allowed to run with cgroups");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java
new file mode 100644
index 0000000..60c38fe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test default out of memory handler.
+ */
+public class TestDefaultOOMHandler {
+
+  /**
+   * Test an OOM situation where no containers are running.
+   */
+  @Test(expected = YarnRuntimeException.class)
+  public void testNoContainers() throws Exception {
+    Context context = mock(Context.class);
+
+    when(context.getContainers()).thenReturn(new ConcurrentHashMap<>());
+
+    CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+    when(cGroupsHandler.getCGroupParam(
+        CGroupsHandler.CGroupController.MEMORY,
+        "",
+        CGROUP_PARAM_MEMORY_OOM_CONTROL))
+        .thenReturn("under_oom 1").thenReturn("under_oom 0");
+
+    DefaultOOMHandler handler = new DefaultOOMHandler(context, false);
+    handler.setCGroupsHandler(cGroupsHandler);
+
+    handler.run();
+  }
+
+  /**
+   * We have two containers, both out of limit. We should kill the later one.
+   *
+   * @throws Exception exception
+   */
+  @Test
+  public void testBothContainersOOM() throws Exception {
+    ConcurrentHashMap<ContainerId, Container> containers =
+        new ConcurrentHashMap<>(new LinkedHashMap<>());
+
+    Container c1 = mock(Container.class);
+    ContainerId cid1 = createContainerId(1);
+    when(c1.getContainerId()).thenReturn(cid1);
+    when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
+    when(c1.getContainerStartTime()).thenReturn((long) 1);
+    containers.put(createContainerId(1), c1);
+
+    Container c2 = mock(Container.class);
+    ContainerId cid2 = createContainerId(2);
+    when(c2.getContainerId()).thenReturn(cid2);
+    when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
+    when(c2.getContainerStartTime()).thenReturn((long) 2);
+    containers.put(cid2, c2);
+
+    CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid1.toString(), CGROUP_FILE_TASKS))
+        .thenReturn("1234").thenReturn("");
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+        .thenReturn(getMB(11));
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+        .thenReturn(getMB(11));
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid2.toString(), CGROUP_FILE_TASKS))
+        .thenReturn("1235").thenReturn("");
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+        .thenReturn(getMB(11));
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+        .thenReturn(getMB(11));
+
+    ContainerExecutor ex = mock(ContainerExecutor.class);
+
+    runOOMHandler(containers, cGroupsHandler, ex);
+
+    verify(ex, times(1)).signalContainer(
+        new ContainerSignalContext.Builder()
+            .setPid("1235")
+            .setContainer(c2)
+            .setSignal(ContainerExecutor.Signal.KILL)
+            .build()
+    );
+    verify(ex, times(1)).signalContainer(any());
+  }
+
+  /**
+   * We have two containers, one out of limit. We should kill that one.
+   * This should happen even, if it was started earlier
+   *
+   * @throws Exception exception
+   */
+  @Test
+  public void testOneContainerOOM() throws Exception {
+    ConcurrentHashMap<ContainerId, Container> containers =
+        new ConcurrentHashMap<>(new LinkedHashMap<>());
+
+    Container c1 = mock(Container.class);
+    ContainerId cid1 = createContainerId(1);
+    when(c1.getContainerId()).thenReturn(cid1);
+    when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
+    when(c1.getContainerStartTime()).thenReturn((long) 2);
+    containers.put(createContainerId(1), c1);
+
+    Container c2 = mock(Container.class);
+    ContainerId cid2 = createContainerId(2);
+    when(c2.getContainerId()).thenReturn(cid2);
+    when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
+    when(c2.getContainerStartTime()).thenReturn((long) 1);
+    containers.put(cid2, c2);
+
+    CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid1.toString(), CGROUP_FILE_TASKS))
+        .thenReturn("1234").thenReturn("");
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+        .thenReturn(getMB(9));
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+        .thenReturn(getMB(9));
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid2.toString(), CGROUP_FILE_TASKS))
+        .thenReturn("1235").thenReturn("");
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+        .thenReturn(getMB(11));
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+        .thenReturn(getMB(11));
+
+    ContainerExecutor ex = mock(ContainerExecutor.class);
+    runOOMHandler(containers, cGroupsHandler, ex);
+
+    verify(ex, times(1)).signalContainer(
+        new ContainerSignalContext.Builder()
+            .setPid("1235")
+            .setContainer(c2)
+            .setSignal(ContainerExecutor.Signal.KILL)
+            .build()
+    );
+    verify(ex, times(1)).signalContainer(any());
+  }
+
+  /**
+   * We have two containers, neither out of limit. We should kill the later one.
+   *
+   * @throws Exception exception
+   */
+  @Test
+  public void testNoContainerOOM() throws Exception {
+    ConcurrentHashMap<ContainerId, Container> containers =
+        new ConcurrentHashMap<>(new LinkedHashMap<>());
+
+    Container c1 = mock(Container.class);
+    ContainerId cid1 = createContainerId(1);
+    when(c1.getContainerId()).thenReturn(cid1);
+    when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
+    when(c1.getContainerStartTime()).thenReturn((long) 1);
+    containers.put(createContainerId(1), c1);
+
+    Container c2 = mock(Container.class);
+    ContainerId cid2 = createContainerId(2);
+    when(c2.getContainerId()).thenReturn(cid2);
+    when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
+    when(c2.getContainerStartTime()).thenReturn((long) 2);
+    containers.put(cid2, c2);
+
+    CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid1.toString(), CGROUP_FILE_TASKS))
+        .thenReturn("1234").thenReturn("");
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+        .thenReturn(getMB(9));
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+        .thenReturn(getMB(9));
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid2.toString(), CGROUP_FILE_TASKS))
+        .thenReturn("1235").thenReturn("");
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+        .thenReturn(getMB(9));
+    when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+        cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+        .thenReturn(getMB(9));
+
+    ContainerExecutor ex = mock(ContainerExecutor.class);
+    runOOMHandler(containers, cGroupsHandler, ex);
+
+    verify(ex, times(1)).signalContainer(
+        new ContainerSignalContext.Builder()
+            .setPid("1235")
+            .setContainer(c2)
+            .setSignal(ContainerExecutor.Signal.KILL)
+            .build()
+    );
+    verify(ex, times(1)).signalContainer(any());
+  }
+
+  private void runOOMHandler(
+      ConcurrentHashMap<ContainerId, Container> containers,
+      CGroupsHandler cGroupsHandler, ContainerExecutor ex)
+      throws IOException, ResourceHandlerException {
+    Context context = mock(Context.class);
+    when(context.getContainers()).thenReturn(containers);
+
+    when(ex.signalContainer(any()))
+        .thenAnswer(invocation -> {
+          assertEquals("Wrong pid killed", "1235",
+              ((ContainerSignalContext) invocation.getArguments()[0]).getPid());
+          return true;
+        });
+
+    when(cGroupsHandler.getCGroupParam(
+        CGroupsHandler.CGroupController.MEMORY,
+        "",
+        CGROUP_PARAM_MEMORY_OOM_CONTROL))
+        .thenReturn("under_oom 1").thenReturn("under_oom 0");
+
+    when(context.getContainerExecutor()).thenReturn(ex);
+
+    DefaultOOMHandler handler = new DefaultOOMHandler(context, false);
+    handler.setCGroupsHandler(cGroupsHandler);
+
+    handler.run();
+  }
+
+  private class AppId extends ApplicationIdPBImpl {
+    AppId(long clusterTs, int appId) {
+      this.setClusterTimestamp(clusterTs);
+      this.setId(appId);
+    }
+  }
+
+  private ContainerId createContainerId(int id) {
+    ApplicationId applicationId = new AppId(1, 1);
+
+    ApplicationAttemptId applicationAttemptId
+        = mock(ApplicationAttemptId.class);
+    when(applicationAttemptId.getApplicationId()).thenReturn(applicationId);
+    when(applicationAttemptId.getAttemptId()).thenReturn(1);
+
+    ContainerId containerId = mock(ContainerId.class);
+    when(containerId.toString()).thenReturn(Integer.toString(id));
+    when(containerId.getContainerId()).thenReturn(new Long(1));
+
+    return containerId;
+  }
+
+  ContainerTokenIdentifier getToken() {
+    ContainerTokenIdentifier id = mock(ContainerTokenIdentifier.class);
+    when(id.getVersion()).thenReturn(1);
+    return id;
+  }
+
+  String getMB(long mb) {
+    return Long.toString(mb * 1024 * 1024);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
index 412b8cd..2882b32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
@@ -94,6 +94,7 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
         YarnConfiguration.NM_MON_RESOURCE_CALCULATOR,
         LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
     conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, false);
     super.setup();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index c5fdccd..8aee532 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -174,9 +174,10 @@ public class TestContainersMonitorResourceChange {
   }
 
   @Test
-  public void testContainersResourceChange() throws Exception {
+  public void testContainersResourceChangePolling() throws Exception {
     // set container monitor interval to be 20ms
     conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20L);
+    conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, false);
     containersMonitor = createContainersMonitor(executor, dispatcher, context);
     containersMonitor.init(conf);
     containersMonitor.start();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManagerCGroupsMemory.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManagerCGroupsMemory.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManagerCGroupsMemory.md
new file mode 100644
index 0000000..ec93234
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManagerCGroupsMemory.md
@@ -0,0 +1,133 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+Using Memory Control in YARN
+=======================
+
+YARN has multiple features to enforce container memory limits. There are three types of controls in YARN that can be used.
+1. The polling feature monitors periodically measures container memory usage and kills the containers that exceed their limits. This is a legacy feature with some issues notably a delay that may lead to node shutdown.
+2. Strict memory control kills each container that has exceeded its limits. It is using the OOM killer capability of the cgroups Linux kernel feature.
+3. Elastic memory control is also based on cgroups. It allows bursting and starts killing containers only, if the overall system memory usage reaches a limit.
+
+If you use 2. or 3. feature 1. is disabled.
+
+Strict Memory Feature
+---------------------
+
+cgroups can be used to preempt containers in case of out-of-memory events. This feature leverages cgroups to clean up containers with the kernel when this happens. If your container exited with exit code `137`, then ou can verify the cause in `/var/log/messages`.
+
+Elastic Memory Feature
+----------------------
+
+The cgroups kernel feature has the ability to notify the node manager, if the parent cgroup of all containers specified by `yarn.nodemanager.linux-container-executor.cgroups.hierarchy` goes over a memory limit. The YARN feature that uses this ability is called elastic memory control. The benefits are that containers can burst using more memory than they are reserved to. This is allowed as long as we do not exceed the overall memory limit. When the limit is reached the kernel freezes all the containers and notifies the node manager. The node manager chooses a container and preempts it. It continues this step until the node is resumed from the OOM condition.
+
+The Limit for Elastic Memory Control
+---------
+
+The limit is the amount of memory allocated to all the containers on the node. The limit is specified by `yarn.nodemanager.resource.memory-mb` and `yarn.nodemanager.vmem-pmem-ratio`. If these are not set, the limit is set based on the available resources. See `yarn.nodemanager.resource.detect-hardware-capabilities` for details.
+
+The pluggable preemption logic
+------------------------------
+
+The preemption logic specifies which container to preempt in a node wide out-of-memory situation. The default logic is the `DefaultOOMHandler`. It picks the latest container that exceeded its memory limit. In the unlikely case that no such container is found, it preempts the container that was launched most recently. This continues until the OOM condition is resolved. This logic supports bursting, when containers use more memory than they reserved as long as we have memory available. This helps to improve the overall cluster utilization. The logic ensures that as long as a container is within its limit, it won't get preempted. If the container bursts it can be preempted. There is a case that all containers are within their limits but we are out of memory. This can also happen in case of oversubscription. We prefer preemting the latest containers to minimize the cost and value lost. Once preempted, the data in the container is lost.
+
+The default out-of-memory handler can be updated using `yarn.nodemanager.elastic-memory-control.oom-handler`. The class named in this configuration entry has to implement java.lang.Runnable. The `run()` function will be called in a node level out-of-memory situation. The constructor should accept an `NmContext` object.
+
+Physical and virtual memory control
+----------------------------------
+
+In case of Elastic Memory Control, the limit applies to the physical or virtual (rss+swap in cgroups) memory depending on whether `yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` is set.
+
+There is no reason to set them both. If the system runs with swap disabled, both will have the same number. If swap is enabled the virtual memory counter will account for pages in physical memory and on the disk. This is what the application allocated and it has control over. The limit should be applied to the virtual memory in this case. When swapping is enabled, the physical memory is no more than the virtual memory and it is adjusted by the kernel not just by the container. There is no point preempting a container when it exceeds a physical memory limit with swapping. The system will just swap out some memory, when needed.
+
+Virtual memory measurement and swapping
+--------------------------------------------
+
+There is a difference between the virtual memory reported by the container monitor and the virtual memory limit specified in the elastic memory control feature. The container monitor uses `ProcfsBasedProcessTree` by default for measurements that returns values from the `proc` file system. The virtual memory returned is the size of the address space of all the processes in each container. This includes anonymous pages, pages swapped out to disk, mapped files and reserved pages among others. Reserved pages are not backed by either physical or swapped memory. They can be a large part of the virtual memory usage. The reservabe address space was limited on 32 bit processors but it is very large on 64-bit ones making this metric less useful. Some Java Virtual Machines reserve large amounts of pages but they do not actually use it. This will result in gigabytes of virtual memory usage shown. However, this does not mean that anything is wrong with the container.
+
+Because of this you can now use `CGroupsResourceCalculator`. This shows only the sum of the physical memory usage and swapped pages as virtual memory usage excluding the reserved address space. This reflects much better what the application and the container allocated.
+
+In order to enable cgroups based resource calculation set `yarn.nodemanager.resource-calculator.class` to `org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsResourceCalculator`.
+
+Configuration quickstart
+------------------------
+
+The following levels of memory enforcement are available and supported:
+
+Level | Configuration type | Options
+---|---|---
+0 | No memory control | All settings below are false
+1 | Strict Container Memory enforcement through polling | P or V
+2 | Strict Container Memory enforcement through cgroups | CG, C and (P or V)
+3 | Elastic Memory Control through cgroups | CG, E and (P or V)
+
+The symbols above mean that the respective configuration entries are `true`:
+
+P: `yarn.nodemanager.pmem-check-enabled`
+
+V: `yarn.nodemanager.vmem-check-enabled`
+
+C: `yarn.nodemanager.resource.memory.enforced`
+
+E: `yarn.nodemanager.elastic-memory-control.enabled`
+
+cgroups prerequisites
+---------------------
+
+CG: C and E require the following prerequisites:
+1. `yarn.nodemanager.container-executor.class` should be `org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor`.
+2. `yarn.nodemanager.runtime.linux.allowed-runtimes` should at least be `default`.
+3. `yarn.nodemanager.resource.memory.enabled` should be `true`
+
+Configuring no memory control
+-----------------------------
+
+`yarn.nodemanager.pmem-check-enabled` and `yarn.nodemanager.vmem-check-enabled` should be `false`.
+
+`yarn.nodemanager.resource.memory.enforced` should be `false`.
+
+`yarn.nodemanager.elastic-memory-control.enabled` should be `false`.
+
+Configuring strict container memory enforcement with polling without cgroups
+----------------------------------------------------------------
+
+`yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` should be `true`.
+
+`yarn.nodemanager.resource.memory.enforced` should be `false`.
+
+`yarn.nodemanager.elastic-memory-control.enabled` should be `false`.
+
+Configuring strict container memory enforcement with cgroups
+------------------------------------------------------------
+
+Strict memory control preempts containers right away using the OOM killer feature of the kernel, when they reach their physical or virtual memory limits. You need to set the following options on top of the prerequisites above to use strict memory control.
+
+Configure the cgroups prerequisites mentioned above.
+
+`yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` should be `true`. You can set them both. **Currently this is ignored by the code, only physical limits can be selected.**
+
+`yarn.nodemanager.resource.memory.enforced` should be true
+
+Configuring elastic memory resource control
+------------------------------------------
+
+The cgroups based elastic memory control preempts containers only if the overall system memory usage reaches its limit allowing bursting. This feature requires setting the following options on top of the prerequisites.
+
+Configure the cgroups prerequisites mentioned above.
+
+`yarn.nodemanager.elastic-memory-control.enabled` should be `true`.
+
+`yarn.nodemanager.resource.memory.enforced` should be `false`
+
+`yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` should be `true`. If swapping is turned off the former should be set, the latter should be set otherwise.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-4599. Set OOM control for memory cgroups. (Miklos Szegedi via Haibo Chen)

Posted by ha...@apache.org.
YARN-4599. Set OOM control for memory cgroups. (Miklos Szegedi via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d9964799
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d9964799
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d9964799

Branch: refs/heads/trunk
Commit: d9964799544eefcf424fcc178d987525f5356cdf
Parents: f09dc73
Author: Haibo Chen <ha...@apache.org>
Authored: Wed May 23 11:29:55 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Wed May 23 16:35:37 2018 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  26 +-
 .../src/main/resources/yarn-default.xml         |  67 ++-
 .../src/CMakeLists.txt                          |  19 +
 .../CGroupElasticMemoryController.java          | 476 +++++++++++++++++++
 .../linux/resources/CGroupsHandler.java         |   6 +
 .../linux/resources/CGroupsHandlerImpl.java     |   6 +-
 .../CGroupsMemoryResourceHandlerImpl.java       |  15 -
 .../linux/resources/DefaultOOMHandler.java      | 254 ++++++++++
 .../monitor/ContainersMonitorImpl.java          |  50 ++
 .../executor/ContainerSignalContext.java        |  41 ++
 .../native/oom-listener/impl/oom_listener.c     | 171 +++++++
 .../native/oom-listener/impl/oom_listener.h     | 102 ++++
 .../oom-listener/impl/oom_listener_main.c       | 104 ++++
 .../oom-listener/test/oom_listener_test_main.cc | 292 ++++++++++++
 .../resources/DummyRunnableWithContext.java     |  31 ++
 .../TestCGroupElasticMemoryController.java      | 319 +++++++++++++
 .../TestCGroupsMemoryResourceHandlerImpl.java   |   6 +-
 .../linux/resources/TestDefaultOOMHandler.java  | 307 ++++++++++++
 .../monitor/TestContainersMonitor.java          |   1 +
 .../TestContainersMonitorResourceChange.java    |   3 +-
 .../site/markdown/NodeManagerCGroupsMemory.md   | 133 ++++++
 22 files changed, 2391 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 934c009..428950b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,7 @@
 target
 build
 dependency-reduced-pom.xml
+make-build-debug
 
 # Filesystem contract test options and credentials
 auth-keys.xml

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8e56cb8..6d08831 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1440,6 +1440,25 @@ public class YarnConfiguration extends Configuration {
     NM_PREFIX + "vmem-pmem-ratio";
   public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f;
 
+  /** Specifies whether to do memory check on overall usage. */
+  public static final String NM_ELASTIC_MEMORY_CONTROL_ENABLED = NM_PREFIX
+      + "elastic-memory-control.enabled";
+  public static final boolean DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED = false;
+
+  /** Specifies the OOM handler code. */
+  public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER = NM_PREFIX
+      + "elastic-memory-control.oom-handler";
+
+  /** The path to the OOM listener.*/
+  public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH =
+      NM_PREFIX + "elastic-memory-control.oom-listener.path";
+
+  /** Maximum time in seconds to resolve an OOM situation. */
+  public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC =
+      NM_PREFIX + "elastic-memory-control.timeout-sec";
+  public static final Integer
+      DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC = 5;
+
   /** Number of Virtual CPU Cores which can be allocated for containers.*/
   public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores";
   public static final int DEFAULT_NM_VCORES = 8;
@@ -2006,13 +2025,6 @@ public class YarnConfiguration extends Configuration {
   /** The path to the Linux container executor.*/
   public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH =
     NM_PREFIX + "linux-container-executor.path";
-  
-  /** 
-   * The UNIX group that the linux-container-executor should run as.
-   * This is intended to be set as part of container-executor.cfg. 
-   */
-  public static final String NM_LINUX_CONTAINER_GROUP =
-    NM_PREFIX + "linux-container-executor.group";
 
   /**
    * True if linux-container-executor should limit itself to one user

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 156ca24..da44ccb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -772,7 +772,7 @@
   <property>
     <description>Maximum size in bytes for configurations that can be provided
       by application to RM for delegation token renewal.
-      By experiment, it's roughly 128 bytes per key-value pair.
+      By experiment, its roughly 128 bytes per key-value pair.
       The default value 12800 allows roughly 100 configs, may be less.
     </description>
     <name>yarn.resourcemanager.delegation-token.max-conf-size-bytes</name>
@@ -1860,14 +1860,6 @@
   </property>
 
   <property>
-    <description>
-    The UNIX group that the linux-container-executor should run as.
-    </description>
-    <name>yarn.nodemanager.linux-container-executor.group</name>
-    <value></value>
-  </property>
-
-  <property>
     <description>T-file compression types used to compress aggregated logs.</description>
     <name>yarn.nodemanager.log-aggregation.compression-type</name>
     <value>none</value>
@@ -2158,7 +2150,7 @@
     <description>
     In the server side it indicates whether timeline service is enabled or not.
     And in the client side, users can enable it to indicate whether client wants
-    to use timeline service. If it's enabled in the client side along with
+    to use timeline service. If its enabled in the client side along with
     security, then yarn client tries to fetch the delegation tokens for the
     timeline server.
     </description>
@@ -3404,7 +3396,7 @@
     <description>
       Defines the limit of the diagnostics message of an application
       attempt, in kilo characters (character count * 1024).
-      When using ZooKeeper to store application state behavior, it's
+      When using ZooKeeper to store application state behavior, its
       important to limit the size of the diagnostic messages to
       prevent YARN from overwhelming ZooKeeper. In cases where
       yarn.resourcemanager.state-store.max-completed-applications is set to
@@ -3819,4 +3811,57 @@
     <value>/usr/bin/numactl</value>
   </property>
 
+  <property>
+    <description>
+      Enable elastic memory control. This is a Linux only feature.
+      When enabled, the node manager adds a listener to receive an
+      event, if all the containers exceeded a limit.
+      The limit is specified by yarn.nodemanager.resource.memory-mb.
+      If this is not set, the limit is set based on the capabilities.
+      See yarn.nodemanager.resource.detect-hardware-capabilities
+      for details.
+      The limit applies to the physical or virtual (rss+swap) memory
+      depending on whether yarn.nodemanager.pmem-check-enabled or
+      yarn.nodemanager.vmem-check-enabled is set.
+    </description>
+    <name>yarn.nodemanager.elastic-memory-control.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+      The name of a JVM class. The class must implement the Runnable
+      interface. It is called,
+      if yarn.nodemanager.elastic-memory-control.enabled
+      is set and the system reaches its memory limit.
+      When called the handler must preempt a container,
+      since all containers are frozen by cgroups.
+      Once preempted some memory is released, so that the
+      kernel can resume all containers. Because of this the
+      handler has to act quickly.
+    </description>
+    <name>yarn.nodemanager.elastic-memory-control.oom-handler</name>
+    <value>org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.DefaultOOMHandler</value>
+  </property>
+
+  <property>
+    <description>
+      The path to the oom-listener tool. Elastic memory control is only
+      supported on Linux. It relies on kernel events. The tool forwards
+      these kernel events to the standard input, so that the node manager
+      can preempt containers, in and out-of-memory scenario.
+      You rarely need to update this setting.
+    </description>
+    <name>yarn.nodemanager.elastic-memory-control.oom-listener.path</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>
+      Maximum time to wait for an OOM situation to get resolved before
+      bringing down the node.
+    </description>
+    <name>yarn.nodemanager.elastic-memory-control.timeout-sec</name>
+    <value>5</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
index 79faeec..a614f80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
@@ -30,6 +30,7 @@ string(REGEX MATCH . HCD_ONE "${HADOOP_CONF_DIR}")
 string(COMPARE EQUAL ${HCD_ONE} / HADOOP_CONF_DIR_IS_ABS)
 
 set (CMAKE_C_STANDARD 99)
+set (CMAKE_CXX_STANDARD 11)
 
 include(CheckIncludeFiles)
 check_include_files("sys/types.h;sys/sysctl.h" HAVE_SYS_SYSCTL_H)
@@ -113,6 +114,7 @@ include_directories(
     ${GTEST_SRC_DIR}/include
     main/native/container-executor
     main/native/container-executor/impl
+    main/native/oom-listener/impl
 )
 # add gtest as system library to suppress gcc warnings
 include_directories(SYSTEM ${GTEST_SRC_DIR}/include)
@@ -171,3 +173,20 @@ add_executable(cetest
         main/native/container-executor/test/utils/test_docker_util.cc)
 target_link_libraries(cetest gtest container)
 output_directory(cetest test)
+
+# CGroup OOM listener
+add_executable(oom-listener
+        main/native/oom-listener/impl/oom_listener.c
+        main/native/oom-listener/impl/oom_listener.h
+        main/native/oom-listener/impl/oom_listener_main.c
+)
+output_directory(oom-listener target/usr/local/bin)
+
+# CGroup OOM listener test with GTest
+add_executable(test-oom-listener
+        main/native/oom-listener/impl/oom_listener.c
+        main/native/oom-listener/impl/oom_listener.h
+        main/native/oom-listener/test/oom_listener_test_main.cc
+)
+target_link_libraries(test-oom-listener gtest)
+output_directory(test-oom-listener test)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java
new file mode 100644
index 0000000..752c3a6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+
+import java.io.File;
+import java.io.InputStream;
+import java.lang.reflect.Constructor;
+import java.nio.charset.Charset;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_PMEM_CHECK_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_VMEM_CHECK_ENABLED;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_NO_LIMIT;
+
+/**
+ * This thread controls memory usage using cgroups. It listens to out of memory
+ * events of all the containers together, and if we go over the limit picks
+ * a container to kill. The algorithm that picks the container is a plugin.
+ */
+public class CGroupElasticMemoryController extends Thread {
+  protected static final Log LOG = LogFactory
+      .getLog(CGroupElasticMemoryController.class);
+  private final Clock clock = new MonotonicClock();
+  private String yarnCGroupPath;
+  private String oomListenerPath;
+  private Runnable oomHandler;
+  private CGroupsHandler cgroups;
+  private boolean controlPhysicalMemory;
+  private boolean controlVirtualMemory;
+  private long limit;
+  private Process process = null;
+  private boolean stopped = false;
+  private int timeoutMS;
+
+  /**
+   * Default constructor.
+   * @param conf Yarn configuration to use
+   * @param context Node manager context to out of memory handler
+   * @param cgroups Cgroups handler configured
+   * @param controlPhysicalMemory Whether to listen to physical memory OOM
+   * @param controlVirtualMemory Whether to listen to virtual memory OOM
+   * @param limit memory limit in bytes
+   * @param oomHandlerOverride optional OOM handler
+   * @exception YarnException Could not instantiate class
+   */
+  @VisibleForTesting
+  CGroupElasticMemoryController(Configuration conf,
+                                       Context context,
+                                       CGroupsHandler cgroups,
+                                       boolean controlPhysicalMemory,
+                                       boolean controlVirtualMemory,
+                                       long limit,
+                                       Runnable oomHandlerOverride)
+      throws YarnException {
+    super("CGroupElasticMemoryController");
+    boolean controlVirtual = controlVirtualMemory && !controlPhysicalMemory;
+    Runnable oomHandlerTemp =
+        getDefaultOOMHandler(conf, context, oomHandlerOverride, controlVirtual);
+    if (controlPhysicalMemory && controlVirtualMemory) {
+      LOG.warn(
+          NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " +
+          "We cannot control both virtual and physical " +
+          "memory at the same time. Enforcing virtual memory. " +
+          "If swapping is enabled set " +
+          "only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " +
+          "only " + NM_VMEM_CHECK_ENABLED + " to true.");
+    }
+    if (!controlPhysicalMemory && !controlVirtualMemory) {
+      throw new YarnException(
+          NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " +
+              "We need either virtual or physical memory check requested. " +
+              "If swapping is enabled set " +
+              "only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " +
+              "only " + NM_VMEM_CHECK_ENABLED + " to true.");
+    }
+    // We are safe at this point that no more exceptions can be thrown
+    this.timeoutMS =
+        1000 * conf.getInt(NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC,
+        DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC);
+    this.oomListenerPath = getOOMListenerExecutablePath(conf);
+    this.oomHandler = oomHandlerTemp;
+    this.cgroups = cgroups;
+    this.controlPhysicalMemory = !controlVirtual;
+    this.controlVirtualMemory = controlVirtual;
+    this.yarnCGroupPath = this.cgroups
+        .getPathForCGroup(CGroupsHandler.CGroupController.MEMORY, "");
+    this.limit = limit;
+  }
+
+  /**
+   * Get the configured OOM handler.
+   * @param conf configuration
+   * @param context context to pass to constructor
+   * @param oomHandlerLocal Default override
+   * @param controlVirtual Control physical or virtual memory
+   * @return The configured or overridden OOM handler.
+   * @throws YarnException in case the constructor failed
+   */
+  private Runnable getDefaultOOMHandler(
+      Configuration conf, Context context, Runnable oomHandlerLocal,
+      boolean controlVirtual)
+      throws YarnException {
+    Class oomHandlerClass =
+        conf.getClass(
+            YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER,
+            DefaultOOMHandler.class);
+    if (oomHandlerLocal == null) {
+      try {
+        Constructor constr = oomHandlerClass.getConstructor(
+            Context.class, boolean.class);
+        oomHandlerLocal = (Runnable)constr.newInstance(
+            context, controlVirtual);
+      } catch (Exception ex) {
+        throw new YarnException(ex);
+      }
+    }
+    return oomHandlerLocal;
+  }
+
+  /**
+   * Default constructor.
+   * @param conf Yarn configuration to use
+   * @param context Node manager context to out of memory handler
+   * @param cgroups Cgroups handler configured
+   * @param controlPhysicalMemory Whether to listen to physical memory OOM
+   * @param controlVirtualMemory Whether to listen to virtual memory OOM
+   * @param limit memory limit in bytes
+   * @exception YarnException Could not instantiate class
+   */
+  public CGroupElasticMemoryController(Configuration conf,
+                                       Context context,
+                                       CGroupsHandler cgroups,
+                                       boolean controlPhysicalMemory,
+                                       boolean controlVirtualMemory,
+                                       long limit)
+      throws YarnException {
+    this(conf,
+        context,
+        cgroups,
+        controlPhysicalMemory,
+        controlVirtualMemory,
+        limit,
+        null);
+  }
+
+  /**
+   * Exception thrown if the OOM situation is not resolved.
+   */
+  static private class OOMNotResolvedException extends YarnRuntimeException {
+    OOMNotResolvedException(String message, Exception parent) {
+      super(message, parent);
+    }
+  }
+
+  /**
+   * Stop listening to the cgroup.
+   */
+  public synchronized void stopListening() {
+    stopped = true;
+    if (process != null) {
+      process.destroyForcibly();
+    } else {
+      LOG.warn("Trying to stop listening, when listening is not running");
+    }
+  }
+
+  /**
+   * Checks if the CGroupElasticMemoryController is available on this system.
+   * This assumes that Linux container executor is already initialized.
+   * We need to have CGroups enabled.
+   *
+   * @return True if CGroupElasticMemoryController is available.
+   * False otherwise.
+   */
+  public static boolean isAvailable() {
+    try {
+      if (!Shell.LINUX) {
+        LOG.info("CGroupElasticMemoryController currently is supported only "
+            + "on Linux.");
+        return false;
+      }
+      if (ResourceHandlerModule.getCGroupsHandler() == null ||
+          ResourceHandlerModule.getMemoryResourceHandler() == null) {
+        LOG.info("CGroupElasticMemoryController requires enabling " +
+            "memory CGroups with" +
+            YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED);
+        return false;
+      }
+    } catch (SecurityException se) {
+      LOG.info("Failed to get Operating System name. " + se);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Main OOM listening thread. It uses an external process to listen to
+   * Linux events. The external process does not need to run as root, so
+   * it is not related to container-executor. We do not use JNI for security
+   * reasons.
+   */
+  @Override
+  public void run() {
+    ExecutorService executor = null;
+    try {
+      // Disable OOM killer and set a limit.
+      // This has to be set first, so that we get notified about valid events.
+      // We will be notified about events even, if they happened before
+      // oom-listener started
+      setCGroupParameters();
+
+      // Start a listener process
+      ProcessBuilder oomListener = new ProcessBuilder();
+      oomListener.command(oomListenerPath, yarnCGroupPath);
+      synchronized (this) {
+        if (!stopped) {
+          process = oomListener.start();
+        } else {
+          resetCGroupParameters();
+          LOG.info("Listener stopped before starting");
+          return;
+        }
+      }
+      LOG.info(String.format("Listening on %s with %s",
+          yarnCGroupPath,
+          oomListenerPath));
+
+      // We need 1 thread for the error stream and a few others
+      // as a watchdog for the OOM killer
+      executor = Executors.newFixedThreadPool(2);
+
+      // Listen to any errors in the background. We do not expect this to
+      // be large in size, so it will fit into a string.
+      Future<String> errorListener = executor.submit(
+          () -> IOUtils.toString(process.getErrorStream(),
+              Charset.defaultCharset()));
+
+      // We get Linux event increments (8 bytes) forwarded from the event stream
+      // The events cannot be split, so it is safe to read them as a whole
+      // There is no race condition with the cgroup
+      // running out of memory. If oom is 1 at startup
+      // oom_listener will send an initial notification
+      InputStream events = process.getInputStream();
+      byte[] event = new byte[8];
+      int read;
+      // This loop can be exited by terminating the process
+      // with stopListening()
+      while ((read = events.read(event)) == event.length) {
+        // An OOM event has occurred
+        resolveOOM(executor);
+      }
+
+      if (read != -1) {
+        LOG.warn(String.format("Characters returned from event hander: %d",
+            read));
+      }
+
+      // If the input stream is closed, we wait for exit or process terminated.
+      int exitCode = process.waitFor();
+      String error = errorListener.get();
+      process = null;
+      LOG.info(String.format("OOM listener exited %d %s", exitCode, error));
+    } catch (OOMNotResolvedException ex) {
+      // We could mark the node unhealthy but it shuts down the node anyways.
+      // Let's just bring down the node manager all containers are frozen.
+      throw new YarnRuntimeException("Could not resolve OOM", ex);
+    } catch (Exception ex) {
+      synchronized (this) {
+        if (!stopped) {
+          LOG.warn("OOM Listener exiting.", ex);
+        }
+      }
+    } finally {
+      // Make sure we do not leak the child process,
+      // especially if process.waitFor() did not finish.
+      if (process != null && process.isAlive()) {
+        process.destroyForcibly();
+      }
+      if (executor != null) {
+        try {
+          executor.awaitTermination(6, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          LOG.warn("Exiting without processing all OOM events.");
+        }
+        executor.shutdown();
+      }
+      resetCGroupParameters();
+    }
+  }
+
+  /**
+   * Resolve an OOM event.
+   * Listen to the handler timeouts.
+   * @param executor Executor to create watchdog with.
+   * @throws InterruptedException interrupted
+   * @throws java.util.concurrent.ExecutionException cannot launch watchdog
+   */
+  private void resolveOOM(ExecutorService executor)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    // Just log, when we are still in OOM after a couple of seconds
+    final long start = clock.getTime();
+    Future<Boolean> watchdog =
+        executor.submit(() -> watchAndLogOOMState(start));
+    // Kill something to resolve the issue
+    try {
+      oomHandler.run();
+    } catch (RuntimeException ex) {
+      watchdog.cancel(true);
+      throw new OOMNotResolvedException("OOM handler failed", ex);
+    }
+    if (!watchdog.get()) {
+      // If we are still in OOM,
+      // the watchdog will trigger stop
+      // listening to exit this loop
+      throw new OOMNotResolvedException("OOM handler timed out", null);
+    }
+  }
+
+  /**
+   * Just watch until we are in OOM and log. Send an update log every second.
+   * @return if the OOM was resolved successfully
+   */
+  private boolean watchAndLogOOMState(long start) {
+    long lastLog = start;
+    try {
+      long end = start;
+      // Throw an error, if we are still in OOM after 5 seconds
+      while(end - start < timeoutMS) {
+        end = clock.getTime();
+        String underOOM = cgroups.getCGroupParam(
+            CGroupsHandler.CGroupController.MEMORY,
+            "",
+            CGROUP_PARAM_MEMORY_OOM_CONTROL);
+        if (underOOM.contains(CGroupsHandler.UNDER_OOM)) {
+          if (end - lastLog > 1000) {
+            LOG.warn(String.format(
+                "OOM not resolved in %d ms", end - start));
+            lastLog = end;
+          }
+        } else {
+          LOG.info(String.format(
+              "Resolved OOM in %d ms", end - start));
+          return true;
+        }
+        // We do not want to saturate the CPU
+        // leaving the resources to the actual OOM killer
+        // but we want to be fast, too.
+        Thread.sleep(10);
+      }
+    } catch (InterruptedException ex) {
+      LOG.debug("Watchdog interrupted");
+    } catch (Exception e) {
+      LOG.warn("Exception running logging thread", e);
+    }
+    LOG.warn(String.format("OOM was not resolved in %d ms",
+        clock.getTime() - start));
+    stopListening();
+    return false;
+  }
+
+  /**
+   * Update root memory cgroup. This contains all containers.
+   * The physical limit has to be set first then the virtual limit.
+   */
+  private void setCGroupParameters() throws ResourceHandlerException {
+    // Disable the OOM killer
+    cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+        CGROUP_PARAM_MEMORY_OOM_CONTROL, "1");
+    if (controlPhysicalMemory && !controlVirtualMemory) {
+      try {
+        // Ignore virtual memory limits, since we do not know what it is set to
+        cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+            CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+      } catch (ResourceHandlerException ex) {
+        LOG.debug("Swap monitoring is turned off in the kernel");
+      }
+      // Set physical memory limits
+      cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit));
+    } else if (controlVirtualMemory && !controlPhysicalMemory) {
+      // Ignore virtual memory limits, since we do not know what it is set to
+      cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+      // Set physical limits to no more than virtual limits
+      cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit));
+      // Set virtual memory limits
+      // Important: it has to be set after physical limit is set
+      cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, Long.toString(limit));
+    } else {
+      throw new ResourceHandlerException(
+          String.format("Unsupported scenario physical:%b virtual:%b",
+              controlPhysicalMemory, controlVirtualMemory));
+    }
+  }
+
+  /**
+   * Reset root memory cgroup to OS defaults. This controls all containers.
+   */
+  private void resetCGroupParameters() {
+    try {
+      try {
+        // Disable memory limits
+        cgroups.updateCGroupParam(
+            CGroupsHandler.CGroupController.MEMORY, "",
+            CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+      } catch (ResourceHandlerException ex) {
+        LOG.debug("Swap monitoring is turned off in the kernel");
+      }
+      cgroups.updateCGroupParam(
+          CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+      // Enable the OOM killer
+      cgroups.updateCGroupParam(
+          CGroupsHandler.CGroupController.MEMORY, "",
+          CGROUP_PARAM_MEMORY_OOM_CONTROL, "0");
+    } catch (ResourceHandlerException ex) {
+      LOG.warn("Error in cleanup", ex);
+    }
+  }
+
+  private static String getOOMListenerExecutablePath(Configuration conf) {
+    String yarnHomeEnvVar =
+        System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
+    if (yarnHomeEnvVar == null) {
+      yarnHomeEnvVar = ".";
+    }
+    File hadoopBin = new File(yarnHomeEnvVar, "bin");
+    String defaultPath =
+        new File(hadoopBin, "oom-listener").getAbsolutePath();
+    final String path = conf.get(
+        YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
+        defaultPath);
+    LOG.debug(String.format("oom-listener path: %s %s", path, defaultPath));
+    return path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
index e279504..9dc16c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
@@ -76,8 +76,14 @@ public interface CGroupsHandler {
   String CGROUP_PARAM_BLKIO_WEIGHT = "weight";
 
   String CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES = "limit_in_bytes";
+  String CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES = "memsw.limit_in_bytes";
   String CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES = "soft_limit_in_bytes";
+  String CGROUP_PARAM_MEMORY_OOM_CONTROL = "oom_control";
   String CGROUP_PARAM_MEMORY_SWAPPINESS = "swappiness";
+  String CGROUP_PARAM_MEMORY_USAGE_BYTES = "usage_in_bytes";
+  String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes";
+  String CGROUP_NO_LIMIT = "-1";
+  String UNDER_OOM = "under_oom 1";
 
 
   String CGROUP_CPU_PERIOD_US = "cfs_period_us";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
index 008f3d7..6ed94e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
@@ -594,7 +594,11 @@ class CGroupsHandlerImpl implements CGroupsHandler {
   @Override
   public String getCGroupParam(CGroupController controller, String cGroupId,
       String param) throws ResourceHandlerException {
-    String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param);
+    String cGroupParamPath =
+        param.equals(CGROUP_FILE_TASKS) ?
+            getPathForCGroup(controller, cGroupId)
+                + Path.SEPARATOR + param :
+        getPathForCGroupParam(controller, cGroupId, param);
 
     try {
       byte[] contents = Files.readAllBytes(Paths.get(cGroupParamPath));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
index 2d1585e..a57adb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
@@ -65,21 +65,6 @@ public class CGroupsMemoryResourceHandlerImpl implements MemoryResourceHandler {
   @Override
   public List<PrivilegedOperation> bootstrap(Configuration conf)
       throws ResourceHandlerException {
-    boolean pmemEnabled =
-        conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED,
-            YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);
-    boolean vmemEnabled =
-        conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED,
-            YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
-    if (pmemEnabled || vmemEnabled) {
-      String msg = "The default YARN physical and/or virtual memory health"
-          + " checkers as well as the CGroups memory controller are enabled. "
-          + "If you wish to use the Cgroups memory controller, please turn off"
-          + " the default physical/virtual memory checkers by setting "
-          + YarnConfiguration.NM_PMEM_CHECK_ENABLED + " and "
-          + YarnConfiguration.NM_VMEM_CHECK_ENABLED + " to false.";
-      throw new ResourceHandlerException(msg);
-    }
     this.cGroupsHandler.initializeCGroupController(MEMORY);
     enforce = conf.getBoolean(
         YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
new file mode 100644
index 0000000..c690225
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES;
+
+/**
+ * A very basic OOM handler implementation.
+ * See the javadoc on the run() method for details.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class DefaultOOMHandler implements Runnable {
+  protected static final Log LOG = LogFactory
+      .getLog(DefaultOOMHandler.class);
+  private Context context;
+  private boolean virtual;
+  private CGroupsHandler cgroups;
+
+  /**
+   * Create an OOM handler.
+   * This has to be public to be able to construct through reflection.
+   * @param context node manager context to work with
+   * @param testVirtual Test virtual memory or physical
+   */
+  public DefaultOOMHandler(Context context, boolean testVirtual) {
+    this.context = context;
+    this.virtual = testVirtual;
+    this.cgroups = ResourceHandlerModule.getCGroupsHandler();
+  }
+
+  @VisibleForTesting
+  void setCGroupsHandler(CGroupsHandler handler) {
+    cgroups = handler;
+  }
+
+  /**
+   * Kill the container, if it has exceeded its request.
+   *
+   * @param container Container to check
+   * @param fileName  CGroup filename (physical or swap/virtual)
+   * @return true, if the container was preempted
+   */
+  private boolean killContainerIfOOM(Container container, String fileName) {
+    String value = null;
+    try {
+      value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+          container.getContainerId().toString(),
+          fileName);
+      long usage = Long.parseLong(value);
+      long request = container.getResource().getMemorySize() * 1024 * 1024;
+
+      // Check if the container has exceeded its limits.
+      if (usage > request) {
+        // Kill the container
+        // We could call the regular cleanup but that sends a
+        // SIGTERM first that cannot be handled by frozen processes.
+        // Walk through the cgroup
+        // tasks file and kill all processes in it
+        sigKill(container);
+        String message = String.format(
+            "Container %s was killed by elastic cgroups OOM handler using %d " +
+                "when requested only %d",
+            container.getContainerId(), usage, request);
+        LOG.warn(message);
+        return true;
+      }
+    } catch (ResourceHandlerException ex) {
+      LOG.warn(String.format("Could not access memory resource for %s",
+          container.getContainerId()), ex);
+    } catch (NumberFormatException ex) {
+      LOG.warn(String.format("Could not parse %s in %s",
+          value, container.getContainerId()));
+    }
+    return false;
+  }
+
+  /**
+   * SIGKILL the specified container. We do this not using the standard
+   * container logic. The reason is that the processes are frozen by
+   * the cgroups OOM handler, so they cannot respond to SIGTERM.
+   * On the other hand we have to be as fast as possible.
+   * We walk through the list of active processes in the container.
+   * This is needed because frozen parents cannot signal their children.
+   * We kill each process and then try again until the whole cgroup
+   * is cleaned up. This logic avoids leaking processes in a cgroup.
+   * Currently the killing only succeeds for PGIDS.
+   *
+   * @param container Container to clean up
+   */
+  private void sigKill(Container container) {
+    boolean finished = false;
+    try {
+      while (!finished) {
+        String[] pids =
+            cgroups.getCGroupParam(
+                CGroupsHandler.CGroupController.MEMORY,
+                container.getContainerId().toString(),
+                CGROUP_FILE_TASKS)
+                .split("\n");
+        finished = true;
+        for (String pid : pids) {
+          // Note: this kills only PGIDs currently
+          if (pid != null && !pid.isEmpty()) {
+            LOG.debug(String.format(
+                "Terminating container %s Sending SIGKILL to -%s",
+                container.getContainerId().toString(),
+                pid));
+            finished = false;
+            try {
+              context.getContainerExecutor().signalContainer(
+                  new ContainerSignalContext.Builder().setContainer(container)
+                      .setUser(container.getUser())
+                      .setPid(pid).setSignal(ContainerExecutor.Signal.KILL)
+                      .build());
+            } catch (IOException ex) {
+              LOG.warn(String.format("Cannot kill container %s pid -%s.",
+                  container.getContainerId(), pid), ex);
+            }
+          }
+        }
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted while waiting for processes to disappear");
+        }
+      }
+    } catch (ResourceHandlerException ex) {
+      LOG.warn(String.format(
+          "Cannot list more tasks in container %s to kill.",
+          container.getContainerId()));
+    }
+  }
+
+  /**
+   * It is called when the node is under an OOM condition. All processes in
+   * all sub-cgroups are suspended. We need to act fast, so that we do not
+   * affect the overall system utilization.
+   * In general we try to find a newly run container that exceeded its limits.
+   * The justification is cost, since probably this is the one that has
+   * accumulated the least amount of uncommitted data so far.
+   * We continue the process until the OOM is resolved.
+   */
+  @Override
+  public void run() {
+    try {
+      // Reverse order by start time
+      Comparator<Container> comparator = (Container o1, Container o2) -> {
+        long order = o1.getContainerStartTime() - o2.getContainerStartTime();
+        return order > 0 ? -1 : order < 0 ? 1 : 0;
+      };
+
+      // We kill containers until the kernel reports the OOM situation resolved
+      // Note: If the kernel has a delay this may kill more than necessary
+      while (true) {
+        String status = cgroups.getCGroupParam(
+            CGroupsHandler.CGroupController.MEMORY,
+            "",
+            CGROUP_PARAM_MEMORY_OOM_CONTROL);
+        if (!status.contains(CGroupsHandler.UNDER_OOM)) {
+          break;
+        }
+
+        // The first pass kills a recent container
+        // that uses more than its request
+        ArrayList<Container> containers = new ArrayList<>();
+        containers.addAll(context.getContainers().values());
+        // Note: Sorting may take a long time with 10K+ containers
+        // but it is acceptable now with low number of containers per node
+        containers.sort(comparator);
+
+        // Kill the latest container that exceeded its request
+        boolean found = false;
+        for (Container container : containers) {
+          if (!virtual) {
+            if (killContainerIfOOM(container,
+                CGROUP_PARAM_MEMORY_USAGE_BYTES)) {
+              found = true;
+              break;
+            }
+          } else {
+            if (killContainerIfOOM(container,
+                CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) {
+              found = true;
+              break;
+            }
+          }
+        }
+        if (found) {
+          continue;
+        }
+
+        // We have not found any containers that ran out of their limit,
+        // so we will kill the latest one. This can happen, if all use
+        // close to their request and one of them requests a big block
+        // triggering the OOM freeze.
+        // Currently there is no other way to identify the outstanding one.
+        if (containers.size() > 0) {
+          Container container = containers.get(0);
+          sigKill(container);
+          String message = String.format(
+              "Newest container %s killed by elastic cgroups OOM handler using",
+              container.getContainerId());
+          LOG.warn(message);
+          continue;
+        }
+
+        // This can happen, if SIGKILL did not clean up
+        // non-PGID or containers or containers launched by other users
+        // or if a process was put to the root YARN cgroup.
+        throw new YarnRuntimeException(
+            "Could not find any containers but CGroups " +
+                "reserved for containers ran out of memory. " +
+                "I am giving up");
+      }
+    } catch (ResourceHandlerException ex) {
+      LOG.warn("Could not fecth OOM status. " +
+          "This is expected at shutdown. Exiting.", ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 35015c2..bd68dfe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +67,7 @@ public class ContainersMonitorImpl extends AbstractService implements
 
   private long monitoringInterval;
   private MonitoringThread monitoringThread;
+  private CGroupElasticMemoryController oomListenerThread;
   private boolean containerMetricsEnabled;
   private long containerMetricsPeriodMs;
   private long containerMetricsUnregisterDelayMs;
@@ -85,6 +89,8 @@ public class ContainersMonitorImpl extends AbstractService implements
 
   private boolean pmemCheckEnabled;
   private boolean vmemCheckEnabled;
+  private boolean elasticMemoryEnforcement;
+  private boolean strictMemoryEnforcement;
   private boolean containersMonitorEnabled;
 
   private long maxVCoresAllottedForContainers;
@@ -173,8 +179,37 @@ public class ContainersMonitorImpl extends AbstractService implements
     vmemCheckEnabled = this.conf.getBoolean(
         YarnConfiguration.NM_VMEM_CHECK_ENABLED,
         YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
+    elasticMemoryEnforcement = this.conf.getBoolean(
+        YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED,
+        YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED);
+    strictMemoryEnforcement = conf.getBoolean(
+        YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,
+        YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED);
     LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
     LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
+    LOG.info("Elastic memory control enabled: " + elasticMemoryEnforcement);
+    LOG.info("Strict memory control enabled: " + strictMemoryEnforcement);
+
+    if (elasticMemoryEnforcement) {
+      if (!CGroupElasticMemoryController.isAvailable()) {
+        // Test for availability outside the constructor
+        // to be able to write non-Linux unit tests for
+        // CGroupElasticMemoryController
+        throw new YarnException(
+            "CGroup Elastic Memory controller enabled but " +
+            "it is not available. Exiting.");
+      } else {
+        this.oomListenerThread = new CGroupElasticMemoryController(
+            conf,
+            context,
+            ResourceHandlerModule.getCGroupsHandler(),
+            pmemCheckEnabled,
+            vmemCheckEnabled,
+            pmemCheckEnabled ?
+                maxPmemAllottedForContainers : maxVmemAllottedForContainers
+        );
+      }
+    }
 
     containersMonitorEnabled =
         isContainerMonitorEnabled() && monitoringInterval > 0;
@@ -246,6 +281,9 @@ public class ContainersMonitorImpl extends AbstractService implements
     if (containersMonitorEnabled) {
       this.monitoringThread.start();
     }
+    if (oomListenerThread != null) {
+      oomListenerThread.start();
+    }
     super.serviceStart();
   }
 
@@ -259,6 +297,14 @@ public class ContainersMonitorImpl extends AbstractService implements
       } catch (InterruptedException e) {
         LOG.info("ContainersMonitorImpl monitoring thread interrupted");
       }
+      if (this.oomListenerThread != null) {
+        this.oomListenerThread.stopListening();
+        try {
+          this.oomListenerThread.join();
+        } finally {
+          this.oomListenerThread = null;
+        }
+      }
     }
     super.serviceStop();
   }
@@ -651,6 +697,10 @@ public class ContainersMonitorImpl extends AbstractService implements
                             ProcessTreeInfo ptInfo,
                             long currentVmemUsage,
                             long currentPmemUsage) {
+      if (elasticMemoryEnforcement || strictMemoryEnforcement) {
+        // We enforce the overall memory usage instead of individual containers
+        return;
+      }
       boolean isMemoryOverLimit = false;
       long vmemLimit = ptInfo.getVmemLimit();
       long pmemLimit = ptInfo.getPmemLimit();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
index 56b571b..5b911b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
@@ -20,6 +20,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.executor;
 
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
@@ -93,4 +94,44 @@ public final class ContainerSignalContext {
   public Signal getSignal() {
     return this.signal;
   }
+
+  /**
+   * Retrun true if we are trying to signal the same process.
+   * @param obj compare to this object
+   * @return whether we try to signal the same process id
+   */
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ContainerSignalContext) {
+      ContainerSignalContext other = (ContainerSignalContext)obj;
+      boolean ret =
+          (other.getPid() == null && getPid() == null) ||
+              (other.getPid() != null && getPid() != null &&
+                  other.getPid().equals(getPid()));
+      ret = ret &&
+          (other.getSignal() == null && getSignal() == null) ||
+          (other.getSignal() != null && getSignal() != null &&
+              other.getSignal().equals(getSignal()));
+      ret = ret &&
+          (other.getContainer() == null && getContainer() == null) ||
+          (other.getContainer() != null && getContainer() != null &&
+              other.getContainer().equals(getContainer()));
+      ret = ret &&
+          (other.getUser() == null && getUser() == null) ||
+          (other.getUser() != null && getUser() != null &&
+              other.getUser().equals(getUser()));
+      return ret;
+    }
+    return super.equals(obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().
+        append(getPid()).
+        append(getSignal()).
+        append(getContainer()).
+        append(getUser()).
+        toHashCode();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c
new file mode 100644
index 0000000..0086b26
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+#include <sys/param.h>
+#include <poll.h>
+#include "oom_listener.h"
+
+/*
+ * Print an error.
+*/
+static inline void print_error(const char *file, const char *message,
+                        ...) {
+  fprintf(stderr, "%s ", file);
+  va_list arguments;
+  va_start(arguments, message);
+  vfprintf(stderr, message, arguments);
+  va_end(arguments);
+}
+
+/*
+ * Listen to OOM events in a memory cgroup. See declaration for details.
+ */
+int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd) {
+  const char *pattern =
+          cgroup[MAX(strlen(cgroup), 1) - 1] == '/'
+          ? "%s%s" :"%s/%s";
+
+  /* Create an event handle, if we do not have one already*/
+  if (descriptors->event_fd == -1 &&
+      (descriptors->event_fd = eventfd(0, 0)) == -1) {
+    print_error(descriptors->command, "eventfd() failed. errno:%d %s\n",
+                errno, strerror(errno));
+    return EXIT_FAILURE;
+  }
+
+  /*
+   * open the file to listen to (memory.oom_control)
+   * and write the event handle and the file handle
+   * to cgroup.event_control
+   */
+  if (snprintf(descriptors->event_control_path,
+               sizeof(descriptors->event_control_path),
+               pattern,
+               cgroup,
+               "cgroup.event_control") < 0) {
+    print_error(descriptors->command, "path too long %s\n", cgroup);
+    return EXIT_FAILURE;
+  }
+
+  if ((descriptors->event_control_fd = open(
+      descriptors->event_control_path,
+      O_WRONLY|O_CREAT, 0600)) == -1) {
+    print_error(descriptors->command, "Could not open %s. errno:%d %s\n",
+                descriptors->event_control_path,
+                errno, strerror(errno));
+    return EXIT_FAILURE;
+  }
+
+  if (snprintf(descriptors->oom_control_path,
+               sizeof(descriptors->oom_control_path),
+               pattern,
+               cgroup,
+               "memory.oom_control") < 0) {
+    print_error(descriptors->command, "path too long %s\n", cgroup);
+    return EXIT_FAILURE;
+  }
+
+  if ((descriptors->oom_control_fd = open(
+      descriptors->oom_control_path,
+      O_RDONLY)) == -1) {
+    print_error(descriptors->command, "Could not open %s. errno:%d %s\n",
+                descriptors->oom_control_path,
+                errno, strerror(errno));
+    return EXIT_FAILURE;
+  }
+
+  if ((descriptors->oom_command_len = (size_t) snprintf(
+      descriptors->oom_command,
+      sizeof(descriptors->oom_command),
+      "%d %d",
+      descriptors->event_fd,
+      descriptors->oom_control_fd)) < 0) {
+    print_error(descriptors->command, "Could print %d %d\n",
+                descriptors->event_control_fd,
+                descriptors->oom_control_fd);
+    return EXIT_FAILURE;
+  }
+
+  if (write(descriptors->event_control_fd,
+            descriptors->oom_command,
+            descriptors->oom_command_len) == -1) {
+    print_error(descriptors->command, "Could not write to %s errno:%d\n",
+                descriptors->event_control_path, errno);
+    return EXIT_FAILURE;
+  }
+
+  if (close(descriptors->event_control_fd) == -1) {
+    print_error(descriptors->command, "Could not close %s errno:%d\n",
+                descriptors->event_control_path, errno);
+    return EXIT_FAILURE;
+  }
+  descriptors->event_control_fd = -1;
+
+  /*
+   * Listen to events as long as the cgroup exists
+   * and forward them to the fd in the argument.
+   */
+  for (;;) {
+    uint64_t u;
+    ssize_t ret = 0;
+    struct stat stat_buffer = {0};
+    struct pollfd poll_fd = {
+        .fd = descriptors->event_fd,
+        .events = POLLIN
+    };
+
+    ret = poll(&poll_fd, 1, descriptors->watch_timeout);
+    if (ret < 0) {
+      /* Error calling poll */
+      print_error(descriptors->command,
+                  "Could not poll eventfd %d errno:%d %s\n", ret,
+                  errno, strerror(errno));
+      return EXIT_FAILURE;
+    }
+
+    if (ret > 0) {
+      /* Event counter values are always 8 bytes */
+      if ((ret = read(descriptors->event_fd, &u, sizeof(u))) != sizeof(u)) {
+        print_error(descriptors->command,
+                    "Could not read from eventfd %d errno:%d %s\n", ret,
+                    errno, strerror(errno));
+        return EXIT_FAILURE;
+      }
+
+      /* Forward the value to the caller, typically stdout */
+      if ((ret = write(fd, &u, sizeof(u))) != sizeof(u)) {
+        print_error(descriptors->command,
+                    "Could not write to pipe %d errno:%d %s\n", ret,
+                    errno, strerror(errno));
+        return EXIT_FAILURE;
+      }
+    } else if (ret == 0) {
+      /* Timeout has elapsed*/
+
+      /* Quit, if the cgroup is deleted */
+      if (stat(cgroup, &stat_buffer) != 0) {
+        break;
+      }
+    }
+  }
+  return EXIT_SUCCESS;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h
new file mode 100644
index 0000000..aa77cb6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <sys/eventfd.h>
+#include <sys/stat.h>
+
+#include <linux/limits.h>
+
+/*
+This file implements a standard cgroups out of memory listener.
+*/
+
+typedef struct _oom_listener_descriptors {
+  /*
+   * Command line that was called to run this process.
+   */
+  const char *command;
+  /*
+   * Event descriptor to watch.
+   * It is filled in by the function,
+   * if not specified, yet.
+   */
+  int event_fd;
+  /*
+   * cgroup.event_control file handle
+   */
+  int event_control_fd;
+  /*
+   * memory.oom_control file handle
+   */
+  int oom_control_fd;
+  /*
+   * cgroup.event_control path
+   */
+  char event_control_path[PATH_MAX];
+  /*
+   * memory.oom_control path
+   */
+  char oom_control_path[PATH_MAX];
+  /*
+   * Control command to write to
+   * cgroup.event_control
+   * Filled by the function.
+   */
+  char oom_command[25];
+  /*
+   * Length of oom_command filled by the function.
+   */
+  size_t oom_command_len;
+  /*
+   * Directory watch timeout
+   */
+  int watch_timeout;
+} _oom_listener_descriptors;
+
+/*
+ Clean up allocated resources in a descriptor structure
+*/
+inline void cleanup(_oom_listener_descriptors *descriptors) {
+  close(descriptors->event_fd);
+  descriptors->event_fd = -1;
+  close(descriptors->event_control_fd);
+  descriptors->event_control_fd = -1;
+  close(descriptors->oom_control_fd);
+  descriptors->oom_control_fd = -1;
+  descriptors->watch_timeout = 1000;
+}
+
+/*
+ * Enable an OOM listener on the memory cgroup cgroup
+ * descriptors: Structure that holds state for testing purposes
+ * cgroup: cgroup path to watch. It has to be a memory cgroup
+ * fd: File to forward events to. Normally this is stdout
+ */
+int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd);
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c
new file mode 100644
index 0000000..eb7fc3e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+
+#include "oom_listener.h"
+
+void print_usage(void) {
+  fprintf(stderr, "oom-listener");
+  fprintf(stderr, "Listen to OOM events in a cgroup");
+  fprintf(stderr, "usage to listen: oom-listener <cgroup directory>\n");
+  fprintf(stderr, "usage to test: oom-listener oom [<pgid>]\n");
+  fprintf(stderr, "example listening: oom-listener /sys/fs/cgroup/memory/hadoop-yarn | xxd -c 8\n");
+  fprintf(stderr, "example oom to test: bash -c 'echo $$ >/sys/fs/cgroup/memory/hadoop-yarn/tasks;oom-listener oom'\n");
+  fprintf(stderr, "example container overload: sudo -u <user> bash -c 'echo $$ && oom-listener oom 0' >/sys/fs/cgroup/memory/hadoop-yarn/<container>/tasks\n");
+  exit(EXIT_FAILURE);
+}
+
+/*
+  Test an OOM situation adding the pid
+  to the group pgid and calling malloc in a loop
+  This can be used to test OOM listener. See examples above.
+*/
+void test_oom_infinite(char* pgids) {
+  if (pgids != NULL) {
+    int pgid = atoi(pgids);
+    setpgid(0, pgid);
+  }
+  while(1) {
+    char* p = (char*)malloc(4096);
+    if (p != NULL) {
+      p[0] = 0xFF;
+    } else {
+      exit(1);
+    }
+  }
+}
+
+/*
+ A command that receives a memory cgroup directory and
+ listens to the events in the directory.
+ It will print a new line on every out of memory event
+ to the standard output.
+ usage:
+ oom-listener <cgroup>
+*/
+int main(int argc, char *argv[]) {
+  if (argc >= 2 &&
+      strcmp(argv[1], "oom") == 0)
+    test_oom_infinite(argc < 3 ? NULL : argv[2]);
+
+  if (argc != 2)
+    print_usage();
+
+  _oom_listener_descriptors descriptors = {
+      .command = argv[0],
+      .event_fd = -1,
+      .event_control_fd = -1,
+      .oom_control_fd = -1,
+      .event_control_path = {0},
+      .oom_control_path = {0},
+      .oom_command = {0},
+      .oom_command_len = 0,
+      .watch_timeout = 1000
+  };
+
+  int ret = oom_listener(&descriptors, argv[1], STDOUT_FILENO);
+
+  cleanup(&descriptors);
+
+  return ret;
+}
+
+#else
+
+/*
+ This tool uses Linux specific functionality,
+ so it is not available for other operating systems
+*/
+int main() {
+  return 1;
+}
+
+#endif
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc
new file mode 100644
index 0000000..9627632
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+extern "C" {
+#include "oom_listener.h"
+}
+
+#include <gtest/gtest.h>
+#include <fstream>
+#include <mutex>
+
+#define CGROUP_ROOT "/sys/fs/cgroup/memory/"
+#define TEST_ROOT "/tmp/test-oom-listener/"
+#define CGROUP_TASKS "tasks"
+#define CGROUP_OOM_CONTROL "memory.oom_control"
+#define CGROUP_LIMIT_PHYSICAL "memory.limit_in_bytes"
+#define CGROUP_LIMIT_SWAP "memory.memsw.limit_in_bytes"
+#define CGROUP_EVENT_CONTROL "cgroup.event_control"
+#define CGROUP_LIMIT (5 * 1024 * 1024)
+
+// We try multiple cgroup directories
+// We try first the official path to test
+// in production
+// If we are running as a user we fall back
+// to mock cgroup
+static const char *cgroup_candidates[] = { CGROUP_ROOT, TEST_ROOT };
+
+int main(int argc, char **argv) {
+  testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
+
+class OOMListenerTest : public ::testing::Test {
+private:
+  char cgroup[PATH_MAX] = {};
+  const char* cgroup_root = nullptr;
+public:
+  OOMListenerTest() = default;
+
+  virtual ~OOMListenerTest() = default;
+  virtual const char* GetCGroup() { return cgroup; }
+  virtual void SetUp() {
+    struct stat cgroup_memory = {};
+    for (unsigned int i = 0; i < GTEST_ARRAY_SIZE_(cgroup_candidates); ++i) {
+      cgroup_root = cgroup_candidates[i];
+
+      // Try to create the root.
+      // We might not have permission and
+      // it may already exist
+      mkdir(cgroup_root, 0700);
+
+      if (0 != stat(cgroup_root, &cgroup_memory)) {
+        printf("%s missing. Skipping test\n", cgroup_root);
+        continue;
+      }
+
+      timespec timespec1 = {};
+      if (0 != clock_gettime(CLOCK_MONOTONIC, &timespec1)) {
+        ASSERT_TRUE(false) << " clock_gettime failed\n";
+      }
+
+      if (snprintf(cgroup, sizeof(cgroup), "%s%lx/",
+                        cgroup_root, timespec1.tv_nsec) <= 0) {
+        cgroup[0] = '\0';
+        printf("%s snprintf failed\n", cgroup_root);
+        continue;
+      }
+
+      // Create a cgroup named the current timestamp
+      // to make it quasi unique
+      if (0 != mkdir(cgroup, 0700)) {
+        printf("%s not writable.\n", cgroup);
+        continue;
+      }
+      break;
+    }
+
+    ASSERT_EQ(0, stat(cgroup, &cgroup_memory))
+                  << "Cannot use or simulate cgroup " << cgroup;
+  }
+  virtual void TearDown() {
+    if (cgroup[0] != '\0') {
+      rmdir(cgroup);
+    }
+    if (cgroup_root != nullptr &&
+        cgroup_root != cgroup_candidates[0]) {
+      rmdir(cgroup_root);
+    }
+  }
+};
+
+/*
+  Unit test for cgroup testing. There are two modes.
+  If the unit test is run as root and we have cgroups
+  we try to crate a cgroup and generate an OOM.
+  If we are not running as root we just sleep instead of
+  hogging memory and simulate the OOM by sending
+  an event in a mock event fd mock_oom_event_as_user.
+*/
+TEST_F(OOMListenerTest, test_oom) {
+  // Disable OOM killer
+  std::ofstream oom_control;
+  std::string oom_control_file =
+      std::string(GetCGroup()).append(CGROUP_OOM_CONTROL);
+  oom_control.open(oom_control_file.c_str(), oom_control.out);
+  oom_control << 1 << std::endl;
+  oom_control.close();
+
+  // Set a low enough limit for physical
+  std::ofstream limit;
+  std::string limit_file =
+      std::string(GetCGroup()).append(CGROUP_LIMIT_PHYSICAL);
+  limit.open(limit_file.c_str(), limit.out);
+  limit << CGROUP_LIMIT << std::endl;
+  limit.close();
+
+  // Set a low enough limit for physical + swap
+  std::ofstream limitSwap;
+  std::string limit_swap_file =
+      std::string(GetCGroup()).append(CGROUP_LIMIT_SWAP);
+  limitSwap.open(limit_swap_file.c_str(), limitSwap.out);
+  limitSwap << CGROUP_LIMIT << std::endl;
+  limitSwap.close();
+
+  // Event control file to set
+  std::string memory_control_file =
+      std::string(GetCGroup()).append(CGROUP_EVENT_CONTROL);
+
+  // Tasks file to check
+  std::string tasks_file =
+      std::string(GetCGroup()).append(CGROUP_TASKS);
+
+  int mock_oom_event_as_user = -1;
+  struct stat stat1 = {};
+  if (0 != stat(memory_control_file.c_str(), &stat1)) {
+    // We cannot tamper with cgroups
+    // running as a user, so simulate an
+    // oom event
+    mock_oom_event_as_user = eventfd(0, 0);
+  }
+  const int simulate_cgroups =
+      mock_oom_event_as_user != -1;
+
+  __pid_t mem_hog_pid = fork();
+  if (!mem_hog_pid) {
+    // Child process to consume too much memory
+    if (simulate_cgroups) {
+      std::cout << "Simulating cgroups OOM" << std::endl;
+      for (;;) {
+        sleep(1);
+      }
+    } else {
+      // Wait until we are added to the cgroup
+      // so that it is accounted for our mem
+      // usage
+      __pid_t cgroupPid;
+      do {
+        std::ifstream tasks;
+        tasks.open(tasks_file.c_str(), tasks.in);
+        tasks >> cgroupPid;
+        tasks.close();
+      } while (cgroupPid != getpid());
+
+      // Start consuming as much memory as we can.
+      // cgroup will stop us at CGROUP_LIMIT
+      const int bufferSize = 1024 * 1024;
+      std::cout << "Consuming too much memory" << std::endl;
+      for (;;) {
+        auto buffer = (char *) malloc(bufferSize);
+        if (buffer != nullptr) {
+          for (int i = 0; i < bufferSize; ++i) {
+            buffer[i] = (char) std::rand();
+          }
+        }
+      }
+    }
+  } else {
+    // Parent test
+    ASSERT_GE(mem_hog_pid, 1) << "Fork failed " << errno;
+
+    // Put child into cgroup
+    std::ofstream tasks;
+    tasks.open(tasks_file.c_str(), tasks.out);
+    tasks << mem_hog_pid << std::endl;
+    tasks.close();
+
+    // Create pipe to get forwarded eventfd
+    int test_pipe[2];
+    ASSERT_EQ(0, pipe(test_pipe));
+
+    // Launch OOM listener
+    // There is no race condition with the process
+    // running out of memory. If oom is 1 at startup
+    // oom_listener will send an initial notification
+    __pid_t listener = fork();
+    if (listener == 0) {
+      // child listener forwarding cgroup events
+      _oom_listener_descriptors descriptors = {
+          .command = "test",
+          .event_fd = mock_oom_event_as_user,
+          .event_control_fd = -1,
+          .oom_control_fd = -1,
+          .event_control_path = {0},
+          .oom_control_path = {0},
+          .oom_command = {0},
+          .oom_command_len = 0,
+          .watch_timeout = 100
+      };
+      int ret = oom_listener(&descriptors, GetCGroup(), test_pipe[1]);
+      cleanup(&descriptors);
+      close(test_pipe[0]);
+      close(test_pipe[1]);
+      exit(ret);
+    } else {
+    // Parent test
+      uint64_t event_id = 1;
+      if (simulate_cgroups) {
+        // We cannot tamper with cgroups
+        // running as a user, so simulate an
+        // oom event
+        ASSERT_EQ(sizeof(event_id),
+                  write(mock_oom_event_as_user,
+                        &event_id,
+                        sizeof(event_id)));
+      }
+      ASSERT_EQ(sizeof(event_id),
+                read(test_pipe[0],
+                     &event_id,
+                     sizeof(event_id)))
+                    << "The event has not arrived";
+      close(test_pipe[0]);
+      close(test_pipe[1]);
+
+      // Simulate OOM killer
+      ASSERT_EQ(0, kill(mem_hog_pid, SIGKILL));
+
+      // Verify that process was killed
+      __WAIT_STATUS mem_hog_status = {};
+      __pid_t exited0 = wait(mem_hog_status);
+      ASSERT_EQ(mem_hog_pid, exited0)
+        << "Wrong process exited";
+      ASSERT_EQ(nullptr, mem_hog_status)
+        << "Test process killed with invalid status";
+
+      if (mock_oom_event_as_user != -1) {
+        ASSERT_EQ(0, unlink(oom_control_file.c_str()));
+        ASSERT_EQ(0, unlink(limit_file.c_str()));
+        ASSERT_EQ(0, unlink(limit_swap_file.c_str()));
+        ASSERT_EQ(0, unlink(tasks_file.c_str()));
+        ASSERT_EQ(0, unlink(memory_control_file.c_str()));
+      }
+      // Once the cgroup is empty delete it
+      ASSERT_EQ(0, rmdir(GetCGroup()))
+                << "Could not delete cgroup " << GetCGroup();
+
+      // Check that oom_listener exited on the deletion of the cgroup
+      __WAIT_STATUS oom_listener_status = {};
+      __pid_t exited1 = wait(oom_listener_status);
+      ASSERT_EQ(listener, exited1)
+        << "Wrong process exited";
+      ASSERT_EQ(nullptr, oom_listener_status)
+        << "Listener process exited with invalid status";
+    }
+  }
+}
+
+#else
+/*
+This tool covers Linux specific functionality,
+so it is not available for other operating systems
+*/
+int main() {
+  return 1;
+}
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java
new file mode 100644
index 0000000..54bcb13
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Runnable that does not do anything.
+ */
+public class DummyRunnableWithContext implements Runnable {
+  public DummyRunnableWithContext(Context context, boolean virtual) {
+  }
+  @Override
+  public void run() {
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org