You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/30 21:01:43 UTC
[16/50] [abbrv] hadoop git commit: YARN-4599. Set OOM control for
memory cgroups. (Miklos Szegedi via Haibo Chen)
YARN-4599. Set OOM control for memory cgroups. (Miklos Szegedi via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9686584f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9686584f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9686584f
Branch: refs/heads/HDDS-48
Commit: 9686584f919df716b415c1477572b4c31752c872
Parents: 4772e79
Author: Haibo Chen <ha...@apache.org>
Authored: Wed May 23 11:29:55 2018 -0700
Committer: Hanisha Koneru <ha...@apache.org>
Committed: Wed May 30 14:00:25 2018 -0700
----------------------------------------------------------------------
.gitignore | 1 +
.../hadoop/yarn/conf/YarnConfiguration.java | 26 +-
.../src/main/resources/yarn-default.xml | 67 ++-
.../src/CMakeLists.txt | 19 +
.../CGroupElasticMemoryController.java | 476 +++++++++++++++++++
.../linux/resources/CGroupsHandler.java | 6 +
.../linux/resources/CGroupsHandlerImpl.java | 6 +-
.../CGroupsMemoryResourceHandlerImpl.java | 15 -
.../linux/resources/DefaultOOMHandler.java | 254 ++++++++++
.../monitor/ContainersMonitorImpl.java | 50 ++
.../executor/ContainerSignalContext.java | 41 ++
.../native/oom-listener/impl/oom_listener.c | 171 +++++++
.../native/oom-listener/impl/oom_listener.h | 102 ++++
.../oom-listener/impl/oom_listener_main.c | 104 ++++
.../oom-listener/test/oom_listener_test_main.cc | 292 ++++++++++++
.../resources/DummyRunnableWithContext.java | 31 ++
.../TestCGroupElasticMemoryController.java | 319 +++++++++++++
.../TestCGroupsMemoryResourceHandlerImpl.java | 6 +-
.../linux/resources/TestDefaultOOMHandler.java | 307 ++++++++++++
.../monitor/TestContainersMonitor.java | 1 +
.../TestContainersMonitorResourceChange.java | 3 +-
.../site/markdown/NodeManagerCGroupsMemory.md | 133 ++++++
22 files changed, 2391 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 934c009..428950b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,7 @@
target
build
dependency-reduced-pom.xml
+make-build-debug
# Filesystem contract test options and credentials
auth-keys.xml
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8e56cb8..6d08831 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1440,6 +1440,25 @@ public class YarnConfiguration extends Configuration {
NM_PREFIX + "vmem-pmem-ratio";
public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f;
+ /** Specifies whether to do memory check on overall usage. */
+ public static final String NM_ELASTIC_MEMORY_CONTROL_ENABLED = NM_PREFIX
+ + "elastic-memory-control.enabled";
+ public static final boolean DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED = false;
+
+ /** Specifies the OOM handler code. */
+ public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER = NM_PREFIX
+ + "elastic-memory-control.oom-handler";
+
+ /** The path to the OOM listener.*/
+ public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH =
+ NM_PREFIX + "elastic-memory-control.oom-listener.path";
+
+ /** Maximum time in seconds to resolve an OOM situation. */
+ public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC =
+ NM_PREFIX + "elastic-memory-control.timeout-sec";
+ public static final Integer
+ DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC = 5;
+
/** Number of Virtual CPU Cores which can be allocated for containers.*/
public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores";
public static final int DEFAULT_NM_VCORES = 8;
@@ -2006,13 +2025,6 @@ public class YarnConfiguration extends Configuration {
/** The path to the Linux container executor.*/
public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH =
NM_PREFIX + "linux-container-executor.path";
-
- /**
- * The UNIX group that the linux-container-executor should run as.
- * This is intended to be set as part of container-executor.cfg.
- */
- public static final String NM_LINUX_CONTAINER_GROUP =
- NM_PREFIX + "linux-container-executor.group";
/**
* True if linux-container-executor should limit itself to one user
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 156ca24..da44ccb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -772,7 +772,7 @@
<property>
<description>Maximum size in bytes for configurations that can be provided
by application to RM for delegation token renewal.
- By experiment, it's roughly 128 bytes per key-value pair.
+ By experiment, its roughly 128 bytes per key-value pair.
The default value 12800 allows roughly 100 configs, may be less.
</description>
<name>yarn.resourcemanager.delegation-token.max-conf-size-bytes</name>
@@ -1860,14 +1860,6 @@
</property>
<property>
- <description>
- The UNIX group that the linux-container-executor should run as.
- </description>
- <name>yarn.nodemanager.linux-container-executor.group</name>
- <value></value>
- </property>
-
- <property>
<description>T-file compression types used to compress aggregated logs.</description>
<name>yarn.nodemanager.log-aggregation.compression-type</name>
<value>none</value>
@@ -2158,7 +2150,7 @@
<description>
In the server side it indicates whether timeline service is enabled or not.
And in the client side, users can enable it to indicate whether client wants
- to use timeline service. If it's enabled in the client side along with
+ to use timeline service. If its enabled in the client side along with
security, then yarn client tries to fetch the delegation tokens for the
timeline server.
</description>
@@ -3404,7 +3396,7 @@
<description>
Defines the limit of the diagnostics message of an application
attempt, in kilo characters (character count * 1024).
- When using ZooKeeper to store application state behavior, it's
+ When using ZooKeeper to store application state behavior, its
important to limit the size of the diagnostic messages to
prevent YARN from overwhelming ZooKeeper. In cases where
yarn.resourcemanager.state-store.max-completed-applications is set to
@@ -3819,4 +3811,57 @@
<value>/usr/bin/numactl</value>
</property>
+ <property>
+ <description>
+ Enable elastic memory control. This is a Linux only feature.
+ When enabled, the node manager adds a listener to receive an
+ event, if all the containers exceeded a limit.
+ The limit is specified by yarn.nodemanager.resource.memory-mb.
+ If this is not set, the limit is set based on the capabilities.
+ See yarn.nodemanager.resource.detect-hardware-capabilities
+ for details.
+ The limit applies to the physical or virtual (rss+swap) memory
+ depending on whether yarn.nodemanager.pmem-check-enabled or
+ yarn.nodemanager.vmem-check-enabled is set.
+ </description>
+ <name>yarn.nodemanager.elastic-memory-control.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <description>
+ The name of a JVM class. The class must implement the Runnable
+ interface. It is called,
+ if yarn.nodemanager.elastic-memory-control.enabled
+ is set and the system reaches its memory limit.
+ When called the handler must preempt a container,
+ since all containers are frozen by cgroups.
+ Once preempted some memory is released, so that the
+ kernel can resume all containers. Because of this the
+ handler has to act quickly.
+ </description>
+ <name>yarn.nodemanager.elastic-memory-control.oom-handler</name>
+ <value>org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.DefaultOOMHandler</value>
+ </property>
+
+ <property>
+ <description>
+ The path to the oom-listener tool. Elastic memory control is only
+ supported on Linux. It relies on kernel events. The tool forwards
+ these kernel events to the standard input, so that the node manager
+ can preempt containers, in and out-of-memory scenario.
+ You rarely need to update this setting.
+ </description>
+ <name>yarn.nodemanager.elastic-memory-control.oom-listener.path</name>
+ <value></value>
+ </property>
+
+ <property>
+ <description>
+ Maximum time to wait for an OOM situation to get resolved before
+ bringing down the node.
+ </description>
+ <name>yarn.nodemanager.elastic-memory-control.timeout-sec</name>
+ <value>5</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
index 79faeec..a614f80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
@@ -30,6 +30,7 @@ string(REGEX MATCH . HCD_ONE "${HADOOP_CONF_DIR}")
string(COMPARE EQUAL ${HCD_ONE} / HADOOP_CONF_DIR_IS_ABS)
set (CMAKE_C_STANDARD 99)
+set (CMAKE_CXX_STANDARD 11)
include(CheckIncludeFiles)
check_include_files("sys/types.h;sys/sysctl.h" HAVE_SYS_SYSCTL_H)
@@ -113,6 +114,7 @@ include_directories(
${GTEST_SRC_DIR}/include
main/native/container-executor
main/native/container-executor/impl
+ main/native/oom-listener/impl
)
# add gtest as system library to suppress gcc warnings
include_directories(SYSTEM ${GTEST_SRC_DIR}/include)
@@ -171,3 +173,20 @@ add_executable(cetest
main/native/container-executor/test/utils/test_docker_util.cc)
target_link_libraries(cetest gtest container)
output_directory(cetest test)
+
+# CGroup OOM listener
+add_executable(oom-listener
+ main/native/oom-listener/impl/oom_listener.c
+ main/native/oom-listener/impl/oom_listener.h
+ main/native/oom-listener/impl/oom_listener_main.c
+)
+output_directory(oom-listener target/usr/local/bin)
+
+# CGroup OOM listener test with GTest
+add_executable(test-oom-listener
+ main/native/oom-listener/impl/oom_listener.c
+ main/native/oom-listener/impl/oom_listener.h
+ main/native/oom-listener/test/oom_listener_test_main.cc
+)
+target_link_libraries(test-oom-listener gtest)
+output_directory(test-oom-listener test)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java
new file mode 100644
index 0000000..752c3a6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+
+import java.io.File;
+import java.io.InputStream;
+import java.lang.reflect.Constructor;
+import java.nio.charset.Charset;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_PMEM_CHECK_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_VMEM_CHECK_ENABLED;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_NO_LIMIT;
+
+/**
+ * This thread controls memory usage using cgroups. It listens to out of memory
+ * events of all the containers together, and if we go over the limit picks
+ * a container to kill. The algorithm that picks the container is a plugin.
+ */
+public class CGroupElasticMemoryController extends Thread {
+ protected static final Log LOG = LogFactory
+ .getLog(CGroupElasticMemoryController.class);
+ private final Clock clock = new MonotonicClock();
+ private String yarnCGroupPath;
+ private String oomListenerPath;
+ private Runnable oomHandler;
+ private CGroupsHandler cgroups;
+ private boolean controlPhysicalMemory;
+ private boolean controlVirtualMemory;
+ private long limit;
+ private Process process = null;
+ private boolean stopped = false;
+ private int timeoutMS;
+
+ /**
+ * Default constructor.
+ * @param conf Yarn configuration to use
+ * @param context Node manager context to out of memory handler
+ * @param cgroups Cgroups handler configured
+ * @param controlPhysicalMemory Whether to listen to physical memory OOM
+ * @param controlVirtualMemory Whether to listen to virtual memory OOM
+ * @param limit memory limit in bytes
+ * @param oomHandlerOverride optional OOM handler
+ * @exception YarnException Could not instantiate class
+ */
+ @VisibleForTesting
+ CGroupElasticMemoryController(Configuration conf,
+ Context context,
+ CGroupsHandler cgroups,
+ boolean controlPhysicalMemory,
+ boolean controlVirtualMemory,
+ long limit,
+ Runnable oomHandlerOverride)
+ throws YarnException {
+ super("CGroupElasticMemoryController");
+ boolean controlVirtual = controlVirtualMemory && !controlPhysicalMemory;
+ Runnable oomHandlerTemp =
+ getDefaultOOMHandler(conf, context, oomHandlerOverride, controlVirtual);
+ if (controlPhysicalMemory && controlVirtualMemory) {
+ LOG.warn(
+ NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " +
+ "We cannot control both virtual and physical " +
+ "memory at the same time. Enforcing virtual memory. " +
+ "If swapping is enabled set " +
+ "only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " +
+ "only " + NM_VMEM_CHECK_ENABLED + " to true.");
+ }
+ if (!controlPhysicalMemory && !controlVirtualMemory) {
+ throw new YarnException(
+ NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " +
+ "We need either virtual or physical memory check requested. " +
+ "If swapping is enabled set " +
+ "only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " +
+ "only " + NM_VMEM_CHECK_ENABLED + " to true.");
+ }
+ // We are safe at this point that no more exceptions can be thrown
+ this.timeoutMS =
+ 1000 * conf.getInt(NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC,
+ DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC);
+ this.oomListenerPath = getOOMListenerExecutablePath(conf);
+ this.oomHandler = oomHandlerTemp;
+ this.cgroups = cgroups;
+ this.controlPhysicalMemory = !controlVirtual;
+ this.controlVirtualMemory = controlVirtual;
+ this.yarnCGroupPath = this.cgroups
+ .getPathForCGroup(CGroupsHandler.CGroupController.MEMORY, "");
+ this.limit = limit;
+ }
+
+ /**
+ * Get the configured OOM handler.
+ * @param conf configuration
+ * @param context context to pass to constructor
+ * @param oomHandlerLocal Default override
+ * @param controlVirtual Control physical or virtual memory
+ * @return The configured or overridden OOM handler.
+ * @throws YarnException in case the constructor failed
+ */
+ private Runnable getDefaultOOMHandler(
+ Configuration conf, Context context, Runnable oomHandlerLocal,
+ boolean controlVirtual)
+ throws YarnException {
+ Class oomHandlerClass =
+ conf.getClass(
+ YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER,
+ DefaultOOMHandler.class);
+ if (oomHandlerLocal == null) {
+ try {
+ Constructor constr = oomHandlerClass.getConstructor(
+ Context.class, boolean.class);
+ oomHandlerLocal = (Runnable)constr.newInstance(
+ context, controlVirtual);
+ } catch (Exception ex) {
+ throw new YarnException(ex);
+ }
+ }
+ return oomHandlerLocal;
+ }
+
+ /**
+ * Default constructor.
+ * @param conf Yarn configuration to use
+ * @param context Node manager context to out of memory handler
+ * @param cgroups Cgroups handler configured
+ * @param controlPhysicalMemory Whether to listen to physical memory OOM
+ * @param controlVirtualMemory Whether to listen to virtual memory OOM
+ * @param limit memory limit in bytes
+ * @exception YarnException Could not instantiate class
+ */
+ public CGroupElasticMemoryController(Configuration conf,
+ Context context,
+ CGroupsHandler cgroups,
+ boolean controlPhysicalMemory,
+ boolean controlVirtualMemory,
+ long limit)
+ throws YarnException {
+ this(conf,
+ context,
+ cgroups,
+ controlPhysicalMemory,
+ controlVirtualMemory,
+ limit,
+ null);
+ }
+
+ /**
+ * Exception thrown if the OOM situation is not resolved.
+ */
+ static private class OOMNotResolvedException extends YarnRuntimeException {
+ OOMNotResolvedException(String message, Exception parent) {
+ super(message, parent);
+ }
+ }
+
+ /**
+ * Stop listening to the cgroup.
+ */
+ public synchronized void stopListening() {
+ stopped = true;
+ if (process != null) {
+ process.destroyForcibly();
+ } else {
+ LOG.warn("Trying to stop listening, when listening is not running");
+ }
+ }
+
+ /**
+ * Checks if the CGroupElasticMemoryController is available on this system.
+ * This assumes that Linux container executor is already initialized.
+ * We need to have CGroups enabled.
+ *
+ * @return True if CGroupElasticMemoryController is available.
+ * False otherwise.
+ */
+ public static boolean isAvailable() {
+ try {
+ if (!Shell.LINUX) {
+ LOG.info("CGroupElasticMemoryController currently is supported only "
+ + "on Linux.");
+ return false;
+ }
+ if (ResourceHandlerModule.getCGroupsHandler() == null ||
+ ResourceHandlerModule.getMemoryResourceHandler() == null) {
+ LOG.info("CGroupElasticMemoryController requires enabling " +
+ "memory CGroups with" +
+ YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED);
+ return false;
+ }
+ } catch (SecurityException se) {
+ LOG.info("Failed to get Operating System name. " + se);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Main OOM listening thread. It uses an external process to listen to
+ * Linux events. The external process does not need to run as root, so
+ * it is not related to container-executor. We do not use JNI for security
+ * reasons.
+ */
+ @Override
+ public void run() {
+ ExecutorService executor = null;
+ try {
+ // Disable OOM killer and set a limit.
+ // This has to be set first, so that we get notified about valid events.
+ // We will be notified about events even, if they happened before
+ // oom-listener started
+ setCGroupParameters();
+
+ // Start a listener process
+ ProcessBuilder oomListener = new ProcessBuilder();
+ oomListener.command(oomListenerPath, yarnCGroupPath);
+ synchronized (this) {
+ if (!stopped) {
+ process = oomListener.start();
+ } else {
+ resetCGroupParameters();
+ LOG.info("Listener stopped before starting");
+ return;
+ }
+ }
+ LOG.info(String.format("Listening on %s with %s",
+ yarnCGroupPath,
+ oomListenerPath));
+
+ // We need 1 thread for the error stream and a few others
+ // as a watchdog for the OOM killer
+ executor = Executors.newFixedThreadPool(2);
+
+ // Listen to any errors in the background. We do not expect this to
+ // be large in size, so it will fit into a string.
+ Future<String> errorListener = executor.submit(
+ () -> IOUtils.toString(process.getErrorStream(),
+ Charset.defaultCharset()));
+
+ // We get Linux event increments (8 bytes) forwarded from the event stream
+ // The events cannot be split, so it is safe to read them as a whole
+ // There is no race condition with the cgroup
+ // running out of memory. If oom is 1 at startup
+ // oom_listener will send an initial notification
+ InputStream events = process.getInputStream();
+ byte[] event = new byte[8];
+ int read;
+ // This loop can be exited by terminating the process
+ // with stopListening()
+ while ((read = events.read(event)) == event.length) {
+ // An OOM event has occurred
+ resolveOOM(executor);
+ }
+
+ if (read != -1) {
+ LOG.warn(String.format("Characters returned from event hander: %d",
+ read));
+ }
+
+ // If the input stream is closed, we wait for exit or process terminated.
+ int exitCode = process.waitFor();
+ String error = errorListener.get();
+ process = null;
+ LOG.info(String.format("OOM listener exited %d %s", exitCode, error));
+ } catch (OOMNotResolvedException ex) {
+ // We could mark the node unhealthy but it shuts down the node anyways.
+ // Let's just bring down the node manager all containers are frozen.
+ throw new YarnRuntimeException("Could not resolve OOM", ex);
+ } catch (Exception ex) {
+ synchronized (this) {
+ if (!stopped) {
+ LOG.warn("OOM Listener exiting.", ex);
+ }
+ }
+ } finally {
+ // Make sure we do not leak the child process,
+ // especially if process.waitFor() did not finish.
+ if (process != null && process.isAlive()) {
+ process.destroyForcibly();
+ }
+ if (executor != null) {
+ try {
+ executor.awaitTermination(6, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Exiting without processing all OOM events.");
+ }
+ executor.shutdown();
+ }
+ resetCGroupParameters();
+ }
+ }
+
+ /**
+ * Resolve an OOM event.
+ * Listen to the handler timeouts.
+ * @param executor Executor to create watchdog with.
+ * @throws InterruptedException interrupted
+ * @throws java.util.concurrent.ExecutionException cannot launch watchdog
+ */
+ private void resolveOOM(ExecutorService executor)
+ throws InterruptedException, java.util.concurrent.ExecutionException {
+ // Just log, when we are still in OOM after a couple of seconds
+ final long start = clock.getTime();
+ Future<Boolean> watchdog =
+ executor.submit(() -> watchAndLogOOMState(start));
+ // Kill something to resolve the issue
+ try {
+ oomHandler.run();
+ } catch (RuntimeException ex) {
+ watchdog.cancel(true);
+ throw new OOMNotResolvedException("OOM handler failed", ex);
+ }
+ if (!watchdog.get()) {
+ // If we are still in OOM,
+ // the watchdog will trigger stop
+ // listening to exit this loop
+ throw new OOMNotResolvedException("OOM handler timed out", null);
+ }
+ }
+
+ /**
+ * Just watch until we are in OOM and log. Send an update log every second.
+ * @return if the OOM was resolved successfully
+ */
+ private boolean watchAndLogOOMState(long start) {
+ long lastLog = start;
+ try {
+ long end = start;
+ // Throw an error, if we are still in OOM after 5 seconds
+ while(end - start < timeoutMS) {
+ end = clock.getTime();
+ String underOOM = cgroups.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL);
+ if (underOOM.contains(CGroupsHandler.UNDER_OOM)) {
+ if (end - lastLog > 1000) {
+ LOG.warn(String.format(
+ "OOM not resolved in %d ms", end - start));
+ lastLog = end;
+ }
+ } else {
+ LOG.info(String.format(
+ "Resolved OOM in %d ms", end - start));
+ return true;
+ }
+ // We do not want to saturate the CPU
+ // leaving the resources to the actual OOM killer
+ // but we want to be fast, too.
+ Thread.sleep(10);
+ }
+ } catch (InterruptedException ex) {
+ LOG.debug("Watchdog interrupted");
+ } catch (Exception e) {
+ LOG.warn("Exception running logging thread", e);
+ }
+ LOG.warn(String.format("OOM was not resolved in %d ms",
+ clock.getTime() - start));
+ stopListening();
+ return false;
+ }
+
+ /**
+ * Update root memory cgroup. This contains all containers.
+ * The physical limit has to be set first then the virtual limit.
+ */
+ private void setCGroupParameters() throws ResourceHandlerException {
+ // Disable the OOM killer
+ cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL, "1");
+ if (controlPhysicalMemory && !controlVirtualMemory) {
+ try {
+ // Ignore virtual memory limits, since we do not know what it is set to
+ cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+ CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+ } catch (ResourceHandlerException ex) {
+ LOG.debug("Swap monitoring is turned off in the kernel");
+ }
+ // Set physical memory limits
+ cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+ CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit));
+ } else if (controlVirtualMemory && !controlPhysicalMemory) {
+ // Ignore virtual memory limits, since we do not know what it is set to
+ cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+ CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+ // Set physical limits to no more than virtual limits
+ cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+ CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit));
+ // Set virtual memory limits
+ // Important: it has to be set after physical limit is set
+ cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
+ CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, Long.toString(limit));
+ } else {
+ throw new ResourceHandlerException(
+ String.format("Unsupported scenario physical:%b virtual:%b",
+ controlPhysicalMemory, controlVirtualMemory));
+ }
+ }
+
+ /**
+ * Reset root memory cgroup to OS defaults. This controls all containers.
+ */
+ private void resetCGroupParameters() {
+ try {
+ try {
+ // Disable memory limits
+ cgroups.updateCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY, "",
+ CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+ } catch (ResourceHandlerException ex) {
+ LOG.debug("Swap monitoring is turned off in the kernel");
+ }
+ cgroups.updateCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY, "",
+ CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
+ // Enable the OOM killer
+ cgroups.updateCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY, "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL, "0");
+ } catch (ResourceHandlerException ex) {
+ LOG.warn("Error in cleanup", ex);
+ }
+ }
+
+ private static String getOOMListenerExecutablePath(Configuration conf) {
+ String yarnHomeEnvVar =
+ System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
+ if (yarnHomeEnvVar == null) {
+ yarnHomeEnvVar = ".";
+ }
+ File hadoopBin = new File(yarnHomeEnvVar, "bin");
+ String defaultPath =
+ new File(hadoopBin, "oom-listener").getAbsolutePath();
+ final String path = conf.get(
+ YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
+ defaultPath);
+ LOG.debug(String.format("oom-listener path: %s %s", path, defaultPath));
+ return path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
index e279504..9dc16c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
@@ -76,8 +76,14 @@ public interface CGroupsHandler {
String CGROUP_PARAM_BLKIO_WEIGHT = "weight";
String CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES = "limit_in_bytes";
+ String CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES = "memsw.limit_in_bytes";
String CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES = "soft_limit_in_bytes";
+ String CGROUP_PARAM_MEMORY_OOM_CONTROL = "oom_control";
String CGROUP_PARAM_MEMORY_SWAPPINESS = "swappiness";
+ String CGROUP_PARAM_MEMORY_USAGE_BYTES = "usage_in_bytes";
+ String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes";
+ String CGROUP_NO_LIMIT = "-1";
+ String UNDER_OOM = "under_oom 1";
String CGROUP_CPU_PERIOD_US = "cfs_period_us";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
index 008f3d7..6ed94e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
@@ -594,7 +594,11 @@ class CGroupsHandlerImpl implements CGroupsHandler {
@Override
public String getCGroupParam(CGroupController controller, String cGroupId,
String param) throws ResourceHandlerException {
- String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param);
+ String cGroupParamPath =
+ param.equals(CGROUP_FILE_TASKS) ?
+ getPathForCGroup(controller, cGroupId)
+ + Path.SEPARATOR + param :
+ getPathForCGroupParam(controller, cGroupId, param);
try {
byte[] contents = Files.readAllBytes(Paths.get(cGroupParamPath));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
index 2d1585e..a57adb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java
@@ -65,21 +65,6 @@ public class CGroupsMemoryResourceHandlerImpl implements MemoryResourceHandler {
@Override
public List<PrivilegedOperation> bootstrap(Configuration conf)
throws ResourceHandlerException {
- boolean pmemEnabled =
- conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED,
- YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);
- boolean vmemEnabled =
- conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED,
- YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
- if (pmemEnabled || vmemEnabled) {
- String msg = "The default YARN physical and/or virtual memory health"
- + " checkers as well as the CGroups memory controller are enabled. "
- + "If you wish to use the Cgroups memory controller, please turn off"
- + " the default physical/virtual memory checkers by setting "
- + YarnConfiguration.NM_PMEM_CHECK_ENABLED + " and "
- + YarnConfiguration.NM_VMEM_CHECK_ENABLED + " to false.";
- throw new ResourceHandlerException(msg);
- }
this.cGroupsHandler.initializeCGroupController(MEMORY);
enforce = conf.getBoolean(
YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
new file mode 100644
index 0000000..c690225
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
+import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES;
+
+/**
+ * A very basic OOM handler implementation.
+ * See the javadoc on the run() method for details.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class DefaultOOMHandler implements Runnable {
+ protected static final Log LOG = LogFactory
+ .getLog(DefaultOOMHandler.class);
+ private Context context;
+ private boolean virtual;
+ private CGroupsHandler cgroups;
+
+ /**
+ * Create an OOM handler.
+ * This has to be public to be able to construct through reflection.
+ * @param context node manager context to work with
+ * @param testVirtual Test virtual memory or physical
+ */
+ public DefaultOOMHandler(Context context, boolean testVirtual) {
+ this.context = context;
+ this.virtual = testVirtual;
+ this.cgroups = ResourceHandlerModule.getCGroupsHandler();
+ }
+
+ @VisibleForTesting
+ void setCGroupsHandler(CGroupsHandler handler) {
+ cgroups = handler;
+ }
+
+ /**
+ * Kill the container, if it has exceeded its request.
+ *
+ * @param container Container to check
+ * @param fileName CGroup filename (physical or swap/virtual)
+ * @return true, if the container was preempted
+ */
+ private boolean killContainerIfOOM(Container container, String fileName) {
+ String value = null;
+ try {
+ value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ container.getContainerId().toString(),
+ fileName);
+ long usage = Long.parseLong(value);
+ long request = container.getResource().getMemorySize() * 1024 * 1024;
+
+ // Check if the container has exceeded its limits.
+ if (usage > request) {
+ // Kill the container
+ // We could call the regular cleanup but that sends a
+ // SIGTERM first that cannot be handled by frozen processes.
+ // Walk through the cgroup
+ // tasks file and kill all processes in it
+ sigKill(container);
+ String message = String.format(
+ "Container %s was killed by elastic cgroups OOM handler using %d " +
+ "when requested only %d",
+ container.getContainerId(), usage, request);
+ LOG.warn(message);
+ return true;
+ }
+ } catch (ResourceHandlerException ex) {
+ LOG.warn(String.format("Could not access memory resource for %s",
+ container.getContainerId()), ex);
+ } catch (NumberFormatException ex) {
+ LOG.warn(String.format("Could not parse %s in %s",
+ value, container.getContainerId()));
+ }
+ return false;
+ }
+
+ /**
+ * SIGKILL the specified container. We do this not using the standard
+ * container logic. The reason is that the processes are frozen by
+ * the cgroups OOM handler, so they cannot respond to SIGTERM.
+ * On the other hand we have to be as fast as possible.
+ * We walk through the list of active processes in the container.
+ * This is needed because frozen parents cannot signal their children.
+ * We kill each process and then try again until the whole cgroup
+ * is cleaned up. This logic avoids leaking processes in a cgroup.
+ * Currently the killing only succeeds for PGIDS.
+ *
+ * @param container Container to clean up
+ */
+ private void sigKill(Container container) {
+ boolean finished = false;
+ try {
+ while (!finished) {
+ String[] pids =
+ cgroups.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ container.getContainerId().toString(),
+ CGROUP_FILE_TASKS)
+ .split("\n");
+ finished = true;
+ for (String pid : pids) {
+ // Note: this kills only PGIDs currently
+ if (pid != null && !pid.isEmpty()) {
+ LOG.debug(String.format(
+ "Terminating container %s Sending SIGKILL to -%s",
+ container.getContainerId().toString(),
+ pid));
+ finished = false;
+ try {
+ context.getContainerExecutor().signalContainer(
+ new ContainerSignalContext.Builder().setContainer(container)
+ .setUser(container.getUser())
+ .setPid(pid).setSignal(ContainerExecutor.Signal.KILL)
+ .build());
+ } catch (IOException ex) {
+ LOG.warn(String.format("Cannot kill container %s pid -%s.",
+ container.getContainerId(), pid), ex);
+ }
+ }
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while waiting for processes to disappear");
+ }
+ }
+ } catch (ResourceHandlerException ex) {
+ LOG.warn(String.format(
+ "Cannot list more tasks in container %s to kill.",
+ container.getContainerId()));
+ }
+ }
+
+ /**
+ * It is called when the node is under an OOM condition. All processes in
+ * all sub-cgroups are suspended. We need to act fast, so that we do not
+ * affect the overall system utilization.
+ * In general we try to find a newly run container that exceeded its limits.
+ * The justification is cost, since probably this is the one that has
+ * accumulated the least amount of uncommitted data so far.
+ * We continue the process until the OOM is resolved.
+ */
+ @Override
+ public void run() {
+ try {
+ // Reverse order by start time
+ Comparator<Container> comparator = (Container o1, Container o2) -> {
+ long order = o1.getContainerStartTime() - o2.getContainerStartTime();
+ return order > 0 ? -1 : order < 0 ? 1 : 0;
+ };
+
+ // We kill containers until the kernel reports the OOM situation resolved
+ // Note: If the kernel has a delay this may kill more than necessary
+ while (true) {
+ String status = cgroups.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL);
+ if (!status.contains(CGroupsHandler.UNDER_OOM)) {
+ break;
+ }
+
+ // The first pass kills a recent container
+ // that uses more than its request
+ ArrayList<Container> containers = new ArrayList<>();
+ containers.addAll(context.getContainers().values());
+ // Note: Sorting may take a long time with 10K+ containers
+ // but it is acceptable now with low number of containers per node
+ containers.sort(comparator);
+
+ // Kill the latest container that exceeded its request
+ boolean found = false;
+ for (Container container : containers) {
+ if (!virtual) {
+ if (killContainerIfOOM(container,
+ CGROUP_PARAM_MEMORY_USAGE_BYTES)) {
+ found = true;
+ break;
+ }
+ } else {
+ if (killContainerIfOOM(container,
+ CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) {
+ found = true;
+ break;
+ }
+ }
+ }
+ if (found) {
+ continue;
+ }
+
+ // We have not found any containers that ran out of their limit,
+ // so we will kill the latest one. This can happen, if all use
+ // close to their request and one of them requests a big block
+ // triggering the OOM freeze.
+ // Currently there is no other way to identify the outstanding one.
+ if (containers.size() > 0) {
+ Container container = containers.get(0);
+ sigKill(container);
+ String message = String.format(
+ "Newest container %s killed by elastic cgroups OOM handler using",
+ container.getContainerId());
+ LOG.warn(message);
+ continue;
+ }
+
+ // This can happen, if SIGKILL did not clean up
+ // non-PGID or containers or containers launched by other users
+ // or if a process was put to the root YARN cgroup.
+ throw new YarnRuntimeException(
+ "Could not find any containers but CGroups " +
+ "reserved for containers ran out of memory. " +
+ "I am giving up");
+ }
+ } catch (ResourceHandlerException ex) {
+ LOG.warn("Could not fecth OOM status. " +
+ "This is expected at shutdown. Exiting.", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 35015c2..bd68dfe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +67,7 @@ public class ContainersMonitorImpl extends AbstractService implements
private long monitoringInterval;
private MonitoringThread monitoringThread;
+ private CGroupElasticMemoryController oomListenerThread;
private boolean containerMetricsEnabled;
private long containerMetricsPeriodMs;
private long containerMetricsUnregisterDelayMs;
@@ -85,6 +89,8 @@ public class ContainersMonitorImpl extends AbstractService implements
private boolean pmemCheckEnabled;
private boolean vmemCheckEnabled;
+ private boolean elasticMemoryEnforcement;
+ private boolean strictMemoryEnforcement;
private boolean containersMonitorEnabled;
private long maxVCoresAllottedForContainers;
@@ -173,8 +179,37 @@ public class ContainersMonitorImpl extends AbstractService implements
vmemCheckEnabled = this.conf.getBoolean(
YarnConfiguration.NM_VMEM_CHECK_ENABLED,
YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
+ elasticMemoryEnforcement = this.conf.getBoolean(
+ YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED,
+ YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED);
+ strictMemoryEnforcement = conf.getBoolean(
+ YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,
+ YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED);
LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
+ LOG.info("Elastic memory control enabled: " + elasticMemoryEnforcement);
+ LOG.info("Strict memory control enabled: " + strictMemoryEnforcement);
+
+ if (elasticMemoryEnforcement) {
+ if (!CGroupElasticMemoryController.isAvailable()) {
+ // Test for availability outside the constructor
+ // to be able to write non-Linux unit tests for
+ // CGroupElasticMemoryController
+ throw new YarnException(
+ "CGroup Elastic Memory controller enabled but " +
+ "it is not available. Exiting.");
+ } else {
+ this.oomListenerThread = new CGroupElasticMemoryController(
+ conf,
+ context,
+ ResourceHandlerModule.getCGroupsHandler(),
+ pmemCheckEnabled,
+ vmemCheckEnabled,
+ pmemCheckEnabled ?
+ maxPmemAllottedForContainers : maxVmemAllottedForContainers
+ );
+ }
+ }
containersMonitorEnabled =
isContainerMonitorEnabled() && monitoringInterval > 0;
@@ -246,6 +281,9 @@ public class ContainersMonitorImpl extends AbstractService implements
if (containersMonitorEnabled) {
this.monitoringThread.start();
}
+ if (oomListenerThread != null) {
+ oomListenerThread.start();
+ }
super.serviceStart();
}
@@ -259,6 +297,14 @@ public class ContainersMonitorImpl extends AbstractService implements
} catch (InterruptedException e) {
LOG.info("ContainersMonitorImpl monitoring thread interrupted");
}
+ if (this.oomListenerThread != null) {
+ this.oomListenerThread.stopListening();
+ try {
+ this.oomListenerThread.join();
+ } finally {
+ this.oomListenerThread = null;
+ }
+ }
}
super.serviceStop();
}
@@ -651,6 +697,10 @@ public class ContainersMonitorImpl extends AbstractService implements
ProcessTreeInfo ptInfo,
long currentVmemUsage,
long currentPmemUsage) {
+ if (elasticMemoryEnforcement || strictMemoryEnforcement) {
+ // We enforce the overall memory usage instead of individual containers
+ return;
+ }
boolean isMemoryOverLimit = false;
long vmemLimit = ptInfo.getVmemLimit();
long pmemLimit = ptInfo.getPmemLimit();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
index 56b571b..5b911b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.yarn.server.nodemanager.executor;
+import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
@@ -93,4 +94,44 @@ public final class ContainerSignalContext {
public Signal getSignal() {
return this.signal;
}
+
+ /**
+ * Retrun true if we are trying to signal the same process.
+ * @param obj compare to this object
+ * @return whether we try to signal the same process id
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ContainerSignalContext) {
+ ContainerSignalContext other = (ContainerSignalContext)obj;
+ boolean ret =
+ (other.getPid() == null && getPid() == null) ||
+ (other.getPid() != null && getPid() != null &&
+ other.getPid().equals(getPid()));
+ ret = ret &&
+ (other.getSignal() == null && getSignal() == null) ||
+ (other.getSignal() != null && getSignal() != null &&
+ other.getSignal().equals(getSignal()));
+ ret = ret &&
+ (other.getContainer() == null && getContainer() == null) ||
+ (other.getContainer() != null && getContainer() != null &&
+ other.getContainer().equals(getContainer()));
+ ret = ret &&
+ (other.getUser() == null && getUser() == null) ||
+ (other.getUser() != null && getUser() != null &&
+ other.getUser().equals(getUser()));
+ return ret;
+ }
+ return super.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().
+ append(getPid()).
+ append(getSignal()).
+ append(getContainer()).
+ append(getUser()).
+ toHashCode();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c
new file mode 100644
index 0000000..0086b26
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+#include <sys/param.h>
+#include <poll.h>
+#include "oom_listener.h"
+
+/*
+ * Print an error.
+*/
+static inline void print_error(const char *file, const char *message,
+ ...) {
+ fprintf(stderr, "%s ", file);
+ va_list arguments;
+ va_start(arguments, message);
+ vfprintf(stderr, message, arguments);
+ va_end(arguments);
+}
+
+/*
+ * Listen to OOM events in a memory cgroup. See declaration for details.
+ */
+int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd) {
+ const char *pattern =
+ cgroup[MAX(strlen(cgroup), 1) - 1] == '/'
+ ? "%s%s" :"%s/%s";
+
+ /* Create an event handle, if we do not have one already*/
+ if (descriptors->event_fd == -1 &&
+ (descriptors->event_fd = eventfd(0, 0)) == -1) {
+ print_error(descriptors->command, "eventfd() failed. errno:%d %s\n",
+ errno, strerror(errno));
+ return EXIT_FAILURE;
+ }
+
+ /*
+ * open the file to listen to (memory.oom_control)
+ * and write the event handle and the file handle
+ * to cgroup.event_control
+ */
+ if (snprintf(descriptors->event_control_path,
+ sizeof(descriptors->event_control_path),
+ pattern,
+ cgroup,
+ "cgroup.event_control") < 0) {
+ print_error(descriptors->command, "path too long %s\n", cgroup);
+ return EXIT_FAILURE;
+ }
+
+ if ((descriptors->event_control_fd = open(
+ descriptors->event_control_path,
+ O_WRONLY|O_CREAT, 0600)) == -1) {
+ print_error(descriptors->command, "Could not open %s. errno:%d %s\n",
+ descriptors->event_control_path,
+ errno, strerror(errno));
+ return EXIT_FAILURE;
+ }
+
+ if (snprintf(descriptors->oom_control_path,
+ sizeof(descriptors->oom_control_path),
+ pattern,
+ cgroup,
+ "memory.oom_control") < 0) {
+ print_error(descriptors->command, "path too long %s\n", cgroup);
+ return EXIT_FAILURE;
+ }
+
+ if ((descriptors->oom_control_fd = open(
+ descriptors->oom_control_path,
+ O_RDONLY)) == -1) {
+ print_error(descriptors->command, "Could not open %s. errno:%d %s\n",
+ descriptors->oom_control_path,
+ errno, strerror(errno));
+ return EXIT_FAILURE;
+ }
+
+ if ((descriptors->oom_command_len = (size_t) snprintf(
+ descriptors->oom_command,
+ sizeof(descriptors->oom_command),
+ "%d %d",
+ descriptors->event_fd,
+ descriptors->oom_control_fd)) < 0) {
+ print_error(descriptors->command, "Could print %d %d\n",
+ descriptors->event_control_fd,
+ descriptors->oom_control_fd);
+ return EXIT_FAILURE;
+ }
+
+ if (write(descriptors->event_control_fd,
+ descriptors->oom_command,
+ descriptors->oom_command_len) == -1) {
+ print_error(descriptors->command, "Could not write to %s errno:%d\n",
+ descriptors->event_control_path, errno);
+ return EXIT_FAILURE;
+ }
+
+ if (close(descriptors->event_control_fd) == -1) {
+ print_error(descriptors->command, "Could not close %s errno:%d\n",
+ descriptors->event_control_path, errno);
+ return EXIT_FAILURE;
+ }
+ descriptors->event_control_fd = -1;
+
+ /*
+ * Listen to events as long as the cgroup exists
+ * and forward them to the fd in the argument.
+ */
+ for (;;) {
+ uint64_t u;
+ ssize_t ret = 0;
+ struct stat stat_buffer = {0};
+ struct pollfd poll_fd = {
+ .fd = descriptors->event_fd,
+ .events = POLLIN
+ };
+
+ ret = poll(&poll_fd, 1, descriptors->watch_timeout);
+ if (ret < 0) {
+ /* Error calling poll */
+ print_error(descriptors->command,
+ "Could not poll eventfd %d errno:%d %s\n", ret,
+ errno, strerror(errno));
+ return EXIT_FAILURE;
+ }
+
+ if (ret > 0) {
+ /* Event counter values are always 8 bytes */
+ if ((ret = read(descriptors->event_fd, &u, sizeof(u))) != sizeof(u)) {
+ print_error(descriptors->command,
+ "Could not read from eventfd %d errno:%d %s\n", ret,
+ errno, strerror(errno));
+ return EXIT_FAILURE;
+ }
+
+ /* Forward the value to the caller, typically stdout */
+ if ((ret = write(fd, &u, sizeof(u))) != sizeof(u)) {
+ print_error(descriptors->command,
+ "Could not write to pipe %d errno:%d %s\n", ret,
+ errno, strerror(errno));
+ return EXIT_FAILURE;
+ }
+ } else if (ret == 0) {
+ /* Timeout has elapsed*/
+
+ /* Quit, if the cgroup is deleted */
+ if (stat(cgroup, &stat_buffer) != 0) {
+ break;
+ }
+ }
+ }
+ return EXIT_SUCCESS;
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h
new file mode 100644
index 0000000..aa77cb6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <sys/eventfd.h>
+#include <sys/stat.h>
+
+#include <linux/limits.h>
+
+/*
+This file implements a standard cgroups out of memory listener.
+*/
+
+typedef struct _oom_listener_descriptors {
+ /*
+ * Command line that was called to run this process.
+ */
+ const char *command;
+ /*
+ * Event descriptor to watch.
+ * It is filled in by the function,
+ * if not specified, yet.
+ */
+ int event_fd;
+ /*
+ * cgroup.event_control file handle
+ */
+ int event_control_fd;
+ /*
+ * memory.oom_control file handle
+ */
+ int oom_control_fd;
+ /*
+ * cgroup.event_control path
+ */
+ char event_control_path[PATH_MAX];
+ /*
+ * memory.oom_control path
+ */
+ char oom_control_path[PATH_MAX];
+ /*
+ * Control command to write to
+ * cgroup.event_control
+ * Filled by the function.
+ */
+ char oom_command[25];
+ /*
+ * Length of oom_command filled by the function.
+ */
+ size_t oom_command_len;
+ /*
+ * Directory watch timeout
+ */
+ int watch_timeout;
+} _oom_listener_descriptors;
+
+/*
+ Clean up allocated resources in a descriptor structure
+*/
+inline void cleanup(_oom_listener_descriptors *descriptors) {
+ close(descriptors->event_fd);
+ descriptors->event_fd = -1;
+ close(descriptors->event_control_fd);
+ descriptors->event_control_fd = -1;
+ close(descriptors->oom_control_fd);
+ descriptors->oom_control_fd = -1;
+ descriptors->watch_timeout = 1000;
+}
+
+/*
+ * Enable an OOM listener on the memory cgroup cgroup
+ * descriptors: Structure that holds state for testing purposes
+ * cgroup: cgroup path to watch. It has to be a memory cgroup
+ * fd: File to forward events to. Normally this is stdout
+ */
+int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd);
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c
new file mode 100644
index 0000000..eb7fc3e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+
+#include "oom_listener.h"
+
+void print_usage(void) {
+ fprintf(stderr, "oom-listener");
+ fprintf(stderr, "Listen to OOM events in a cgroup");
+ fprintf(stderr, "usage to listen: oom-listener <cgroup directory>\n");
+ fprintf(stderr, "usage to test: oom-listener oom [<pgid>]\n");
+ fprintf(stderr, "example listening: oom-listener /sys/fs/cgroup/memory/hadoop-yarn | xxd -c 8\n");
+ fprintf(stderr, "example oom to test: bash -c 'echo $$ >/sys/fs/cgroup/memory/hadoop-yarn/tasks;oom-listener oom'\n");
+ fprintf(stderr, "example container overload: sudo -u <user> bash -c 'echo $$ && oom-listener oom 0' >/sys/fs/cgroup/memory/hadoop-yarn/<container>/tasks\n");
+ exit(EXIT_FAILURE);
+}
+
+/*
+ Test an OOM situation adding the pid
+ to the group pgid and calling malloc in a loop
+ This can be used to test OOM listener. See examples above.
+*/
+void test_oom_infinite(char* pgids) {
+ if (pgids != NULL) {
+ int pgid = atoi(pgids);
+ setpgid(0, pgid);
+ }
+ while(1) {
+ char* p = (char*)malloc(4096);
+ if (p != NULL) {
+ p[0] = 0xFF;
+ } else {
+ exit(1);
+ }
+ }
+}
+
+/*
+ A command that receives a memory cgroup directory and
+ listens to the events in the directory.
+ It will print a new line on every out of memory event
+ to the standard output.
+ usage:
+ oom-listener <cgroup>
+*/
+int main(int argc, char *argv[]) {
+ if (argc >= 2 &&
+ strcmp(argv[1], "oom") == 0)
+ test_oom_infinite(argc < 3 ? NULL : argv[2]);
+
+ if (argc != 2)
+ print_usage();
+
+ _oom_listener_descriptors descriptors = {
+ .command = argv[0],
+ .event_fd = -1,
+ .event_control_fd = -1,
+ .oom_control_fd = -1,
+ .event_control_path = {0},
+ .oom_control_path = {0},
+ .oom_command = {0},
+ .oom_command_len = 0,
+ .watch_timeout = 1000
+ };
+
+ int ret = oom_listener(&descriptors, argv[1], STDOUT_FILENO);
+
+ cleanup(&descriptors);
+
+ return ret;
+}
+
+#else
+
+/*
+ This tool uses Linux specific functionality,
+ so it is not available for other operating systems
+*/
+int main() {
+ return 1;
+}
+
+#endif
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc
new file mode 100644
index 0000000..9627632
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if __linux
+
+extern "C" {
+#include "oom_listener.h"
+}
+
+#include <gtest/gtest.h>
+#include <fstream>
+#include <mutex>
+
+#define CGROUP_ROOT "/sys/fs/cgroup/memory/"
+#define TEST_ROOT "/tmp/test-oom-listener/"
+#define CGROUP_TASKS "tasks"
+#define CGROUP_OOM_CONTROL "memory.oom_control"
+#define CGROUP_LIMIT_PHYSICAL "memory.limit_in_bytes"
+#define CGROUP_LIMIT_SWAP "memory.memsw.limit_in_bytes"
+#define CGROUP_EVENT_CONTROL "cgroup.event_control"
+#define CGROUP_LIMIT (5 * 1024 * 1024)
+
+// We try multiple cgroup directories
+// We try first the official path to test
+// in production
+// If we are running as a user we fall back
+// to mock cgroup
+static const char *cgroup_candidates[] = { CGROUP_ROOT, TEST_ROOT };
+
+int main(int argc, char **argv) {
+ testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+class OOMListenerTest : public ::testing::Test {
+private:
+ char cgroup[PATH_MAX] = {};
+ const char* cgroup_root = nullptr;
+public:
+ OOMListenerTest() = default;
+
+ virtual ~OOMListenerTest() = default;
+ virtual const char* GetCGroup() { return cgroup; }
+ virtual void SetUp() {
+ struct stat cgroup_memory = {};
+ for (unsigned int i = 0; i < GTEST_ARRAY_SIZE_(cgroup_candidates); ++i) {
+ cgroup_root = cgroup_candidates[i];
+
+ // Try to create the root.
+ // We might not have permission and
+ // it may already exist
+ mkdir(cgroup_root, 0700);
+
+ if (0 != stat(cgroup_root, &cgroup_memory)) {
+ printf("%s missing. Skipping test\n", cgroup_root);
+ continue;
+ }
+
+ timespec timespec1 = {};
+ if (0 != clock_gettime(CLOCK_MONOTONIC, ×pec1)) {
+ ASSERT_TRUE(false) << " clock_gettime failed\n";
+ }
+
+ if (snprintf(cgroup, sizeof(cgroup), "%s%lx/",
+ cgroup_root, timespec1.tv_nsec) <= 0) {
+ cgroup[0] = '\0';
+ printf("%s snprintf failed\n", cgroup_root);
+ continue;
+ }
+
+ // Create a cgroup named the current timestamp
+ // to make it quasi unique
+ if (0 != mkdir(cgroup, 0700)) {
+ printf("%s not writable.\n", cgroup);
+ continue;
+ }
+ break;
+ }
+
+ ASSERT_EQ(0, stat(cgroup, &cgroup_memory))
+ << "Cannot use or simulate cgroup " << cgroup;
+ }
+ virtual void TearDown() {
+ if (cgroup[0] != '\0') {
+ rmdir(cgroup);
+ }
+ if (cgroup_root != nullptr &&
+ cgroup_root != cgroup_candidates[0]) {
+ rmdir(cgroup_root);
+ }
+ }
+};
+
+/*
+ Unit test for cgroup testing. There are two modes.
+ If the unit test is run as root and we have cgroups
+ we try to crate a cgroup and generate an OOM.
+ If we are not running as root we just sleep instead of
+ hogging memory and simulate the OOM by sending
+ an event in a mock event fd mock_oom_event_as_user.
+*/
+TEST_F(OOMListenerTest, test_oom) {
+ // Disable OOM killer
+ std::ofstream oom_control;
+ std::string oom_control_file =
+ std::string(GetCGroup()).append(CGROUP_OOM_CONTROL);
+ oom_control.open(oom_control_file.c_str(), oom_control.out);
+ oom_control << 1 << std::endl;
+ oom_control.close();
+
+ // Set a low enough limit for physical
+ std::ofstream limit;
+ std::string limit_file =
+ std::string(GetCGroup()).append(CGROUP_LIMIT_PHYSICAL);
+ limit.open(limit_file.c_str(), limit.out);
+ limit << CGROUP_LIMIT << std::endl;
+ limit.close();
+
+ // Set a low enough limit for physical + swap
+ std::ofstream limitSwap;
+ std::string limit_swap_file =
+ std::string(GetCGroup()).append(CGROUP_LIMIT_SWAP);
+ limitSwap.open(limit_swap_file.c_str(), limitSwap.out);
+ limitSwap << CGROUP_LIMIT << std::endl;
+ limitSwap.close();
+
+ // Event control file to set
+ std::string memory_control_file =
+ std::string(GetCGroup()).append(CGROUP_EVENT_CONTROL);
+
+ // Tasks file to check
+ std::string tasks_file =
+ std::string(GetCGroup()).append(CGROUP_TASKS);
+
+ int mock_oom_event_as_user = -1;
+ struct stat stat1 = {};
+ if (0 != stat(memory_control_file.c_str(), &stat1)) {
+ // We cannot tamper with cgroups
+ // running as a user, so simulate an
+ // oom event
+ mock_oom_event_as_user = eventfd(0, 0);
+ }
+ const int simulate_cgroups =
+ mock_oom_event_as_user != -1;
+
+ __pid_t mem_hog_pid = fork();
+ if (!mem_hog_pid) {
+ // Child process to consume too much memory
+ if (simulate_cgroups) {
+ std::cout << "Simulating cgroups OOM" << std::endl;
+ for (;;) {
+ sleep(1);
+ }
+ } else {
+ // Wait until we are added to the cgroup
+ // so that it is accounted for our mem
+ // usage
+ __pid_t cgroupPid;
+ do {
+ std::ifstream tasks;
+ tasks.open(tasks_file.c_str(), tasks.in);
+ tasks >> cgroupPid;
+ tasks.close();
+ } while (cgroupPid != getpid());
+
+ // Start consuming as much memory as we can.
+ // cgroup will stop us at CGROUP_LIMIT
+ const int bufferSize = 1024 * 1024;
+ std::cout << "Consuming too much memory" << std::endl;
+ for (;;) {
+ auto buffer = (char *) malloc(bufferSize);
+ if (buffer != nullptr) {
+ for (int i = 0; i < bufferSize; ++i) {
+ buffer[i] = (char) std::rand();
+ }
+ }
+ }
+ }
+ } else {
+ // Parent test
+ ASSERT_GE(mem_hog_pid, 1) << "Fork failed " << errno;
+
+ // Put child into cgroup
+ std::ofstream tasks;
+ tasks.open(tasks_file.c_str(), tasks.out);
+ tasks << mem_hog_pid << std::endl;
+ tasks.close();
+
+ // Create pipe to get forwarded eventfd
+ int test_pipe[2];
+ ASSERT_EQ(0, pipe(test_pipe));
+
+ // Launch OOM listener
+ // There is no race condition with the process
+ // running out of memory. If oom is 1 at startup
+ // oom_listener will send an initial notification
+ __pid_t listener = fork();
+ if (listener == 0) {
+ // child listener forwarding cgroup events
+ _oom_listener_descriptors descriptors = {
+ .command = "test",
+ .event_fd = mock_oom_event_as_user,
+ .event_control_fd = -1,
+ .oom_control_fd = -1,
+ .event_control_path = {0},
+ .oom_control_path = {0},
+ .oom_command = {0},
+ .oom_command_len = 0,
+ .watch_timeout = 100
+ };
+ int ret = oom_listener(&descriptors, GetCGroup(), test_pipe[1]);
+ cleanup(&descriptors);
+ close(test_pipe[0]);
+ close(test_pipe[1]);
+ exit(ret);
+ } else {
+ // Parent test
+ uint64_t event_id = 1;
+ if (simulate_cgroups) {
+ // We cannot tamper with cgroups
+ // running as a user, so simulate an
+ // oom event
+ ASSERT_EQ(sizeof(event_id),
+ write(mock_oom_event_as_user,
+ &event_id,
+ sizeof(event_id)));
+ }
+ ASSERT_EQ(sizeof(event_id),
+ read(test_pipe[0],
+ &event_id,
+ sizeof(event_id)))
+ << "The event has not arrived";
+ close(test_pipe[0]);
+ close(test_pipe[1]);
+
+ // Simulate OOM killer
+ ASSERT_EQ(0, kill(mem_hog_pid, SIGKILL));
+
+ // Verify that process was killed
+ __WAIT_STATUS mem_hog_status = {};
+ __pid_t exited0 = wait(mem_hog_status);
+ ASSERT_EQ(mem_hog_pid, exited0)
+ << "Wrong process exited";
+ ASSERT_EQ(nullptr, mem_hog_status)
+ << "Test process killed with invalid status";
+
+ if (mock_oom_event_as_user != -1) {
+ ASSERT_EQ(0, unlink(oom_control_file.c_str()));
+ ASSERT_EQ(0, unlink(limit_file.c_str()));
+ ASSERT_EQ(0, unlink(limit_swap_file.c_str()));
+ ASSERT_EQ(0, unlink(tasks_file.c_str()));
+ ASSERT_EQ(0, unlink(memory_control_file.c_str()));
+ }
+ // Once the cgroup is empty delete it
+ ASSERT_EQ(0, rmdir(GetCGroup()))
+ << "Could not delete cgroup " << GetCGroup();
+
+ // Check that oom_listener exited on the deletion of the cgroup
+ __WAIT_STATUS oom_listener_status = {};
+ __pid_t exited1 = wait(oom_listener_status);
+ ASSERT_EQ(listener, exited1)
+ << "Wrong process exited";
+ ASSERT_EQ(nullptr, oom_listener_status)
+ << "Listener process exited with invalid status";
+ }
+ }
+}
+
+#else
+/*
+This tool covers Linux specific functionality,
+so it is not available for other operating systems
+*/
+int main() {
+ return 1;
+}
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9686584f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java
new file mode 100644
index 0000000..54bcb13
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Runnable that does not do anything.
+ */
+public class DummyRunnableWithContext implements Runnable {
+ public DummyRunnableWithContext(Context context, boolean virtual) {
+ }
+ @Override
+ public void run() {
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org