You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/03/24 07:47:42 UTC

[flink] 03/04: [FLINK-15989] Improve direct out-of-memory error handling in MemorySegmentFactory

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 25ec89b8dae175246fd28972f8dbf7d479e89b5f
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Mar 19 16:16:58 2020 +0300

    [FLINK-15989] Improve direct out-of-memory error handling in MemorySegmentFactory
---
 .../flink/core/memory/MemorySegmentFactory.java    | 25 +++++++++++++++++++-
 .../java/org/apache/flink/util/ExceptionUtils.java | 27 +++++++++++++++++++++-
 2 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index c297a26..760d2ac 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -19,6 +19,10 @@
 package org.apache.flink.core.memory;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 
@@ -32,6 +36,7 @@ import java.nio.ByteBuffer;
  */
 @Internal
 public final class MemorySegmentFactory {
+	private static final Logger LOG = LoggerFactory.getLogger(MemorySegmentFactory.class);
 
 	/**
 	 * Creates a new memory segment that targets the given heap memory region.
@@ -94,10 +99,28 @@ public final class MemorySegmentFactory {
 	 * @return A new memory segment, backed by unpooled off-heap memory.
 	 */
 	public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
-		ByteBuffer memory = ByteBuffer.allocateDirect(size);
+		ByteBuffer memory = allocateDirectMemory(size);
 		return new HybridMemorySegment(memory, owner, null);
 	}
 
+	private static ByteBuffer allocateDirectMemory(int size) {
+		//noinspection ErrorNotRethrown
+		try {
+			return ByteBuffer.allocateDirect(size);
+		} catch (OutOfMemoryError outOfMemoryError) {
+			// TODO: this error handling can be removed in future,
+			// once we find a common way to handle OOM errors in netty threads.
+			// Here we enrich it to propagate better OOM message to the receiver
+			// if it happens in a netty thread.
+			OutOfMemoryError enrichedOutOfMemoryError = (OutOfMemoryError) ExceptionUtils
+				.enrichTaskManagerOutOfMemoryError(outOfMemoryError);
+			if (ExceptionUtils.isDirectOutOfMemoryError(outOfMemoryError)) {
+				LOG.error("Cannot allocate direct memory segment", enrichedOutOfMemoryError);
+			}
+			throw enrichedOutOfMemoryError;
+		}
+	}
+
 	/**
 	 * Allocates an off-heap unsafe memory and creates a new memory segment to represent that memory.
 	 *
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 5fc1bfe..0a63ae5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -49,6 +49,19 @@ public final class ExceptionUtils {
 	/** The stringified representation of a null exception reference. */
 	public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
 
+	private static final String TM_DIRECT_OOM_ERROR_MESSAGE = String.format(
+		"Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) " +
+			"a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be " +
+			"allocated by user code or some of its dependencies. In this case '%s' configuration option should be " +
+			"increased. Flink framework and its dependencies also consume the direct memory, mostly for network " +
+			"communication. The most of network memory is managed by Flink and should not result in out-of-memory " +
+			"error. In certain special cases, in particular for jobs with high parallelism, the framework may " +
+			"require more direct memory which is not managed by Flink. In this case '%s' configuration option " +
+			"should be increased. If the error persists then there is probably a direct memory leak which has to " +
+			"be investigated and fixed. The task executor has to be shutdown...",
+		TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key(),
+		TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key());
+
 	private static final String TM_METASPACE_OOM_ERROR_MESSAGE = String.format(
 		"Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires " +
 			"a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case " +
@@ -121,7 +134,7 @@ public final class ExceptionUtils {
 	/**
 	 * Generates new {@link OutOfMemoryError} with more detailed message.
 	 *
-	 * <p>This method improves error message for metaspace {@link OutOfMemoryError}.
+	 * <p>This method improves error message for direct and metaspace {@link OutOfMemoryError}.
 	 * It adds description of possible causes and ways of resolution.
 	 *
 	 * @param exception The exception to enrich.
@@ -130,6 +143,8 @@ public final class ExceptionUtils {
 	public static Throwable enrichTaskManagerOutOfMemoryError(Throwable exception) {
 		if (isMetaspaceOutOfMemoryError(exception)) {
 			return changeOutOfMemoryErrorMessage(exception, TM_METASPACE_OOM_ERROR_MESSAGE);
+		} else if (isDirectOutOfMemoryError(exception)) {
+			return changeOutOfMemoryErrorMessage(exception, TM_DIRECT_OOM_ERROR_MESSAGE);
 		}
 		return exception;
 	}
@@ -155,6 +170,16 @@ public final class ExceptionUtils {
 		return isOutOfMemoryErrorWithMessageStartingWith(t, "Metaspace");
 	}
 
+	/**
+	 * Checks whether the given exception indicates a JVM direct out-of-memory error.
+	 *
+	 * @param t The exception to check.
+	 * @return True, if the exception is the direct {@link OutOfMemoryError}, false otherwise.
+	 */
+	public static boolean isDirectOutOfMemoryError(Throwable t) {
+		return isOutOfMemoryErrorWithMessageStartingWith(t, "Direct buffer memory");
+	}
+
 	private static boolean isOutOfMemoryErrorWithMessageStartingWith(Throwable t, String prefix) {
 		// the exact matching of the class is checked to avoid matching any custom subclasses of OutOfMemoryError
 		// as we are interested in the original exceptions, generated by JVM.