You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/03/24 07:47:42 UTC
[flink] 03/04: [FLINK-15989] Improve direct out-of-memory error
handling in MemorySegmentFactory
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 25ec89b8dae175246fd28972f8dbf7d479e89b5f
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Mar 19 16:16:58 2020 +0300
[FLINK-15989] Improve direct out-of-memory error handling in MemorySegmentFactory
---
.../flink/core/memory/MemorySegmentFactory.java | 25 +++++++++++++++++++-
.../java/org/apache/flink/util/ExceptionUtils.java | 27 +++++++++++++++++++++-
2 files changed, 50 insertions(+), 2 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index c297a26..760d2ac 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -19,6 +19,10 @@
package org.apache.flink.core.memory;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
@@ -32,6 +36,7 @@ import java.nio.ByteBuffer;
*/
@Internal
public final class MemorySegmentFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(MemorySegmentFactory.class);
/**
* Creates a new memory segment that targets the given heap memory region.
@@ -94,10 +99,28 @@ public final class MemorySegmentFactory {
* @return A new memory segment, backed by unpooled off-heap memory.
*/
public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
- ByteBuffer memory = ByteBuffer.allocateDirect(size);
+ ByteBuffer memory = allocateDirectMemory(size);
return new HybridMemorySegment(memory, owner, null);
}
+ private static ByteBuffer allocateDirectMemory(int size) {
+ //noinspection ErrorNotRethrown
+ try {
+ return ByteBuffer.allocateDirect(size);
+ } catch (OutOfMemoryError outOfMemoryError) {
+ // TODO: this error handling can be removed in future,
+ // once we find a common way to handle OOM errors in netty threads.
+ // Here we enrich it to propagate better OOM message to the receiver
+ // if it happens in a netty thread.
+ OutOfMemoryError enrichedOutOfMemoryError = (OutOfMemoryError) ExceptionUtils
+ .enrichTaskManagerOutOfMemoryError(outOfMemoryError);
+ if (ExceptionUtils.isDirectOutOfMemoryError(outOfMemoryError)) {
+ LOG.error("Cannot allocate direct memory segment", enrichedOutOfMemoryError);
+ }
+ throw enrichedOutOfMemoryError;
+ }
+ }
+
/**
* Allocates an off-heap unsafe memory and creates a new memory segment to represent that memory.
*
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 5fc1bfe..0a63ae5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -49,6 +49,19 @@ public final class ExceptionUtils {
/** The stringified representation of a null exception reference. */
public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
+ private static final String TM_DIRECT_OOM_ERROR_MESSAGE = String.format(
+ "Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) " +
+ "a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be " +
+ "allocated by user code or some of its dependencies. In this case '%s' configuration option should be " +
+ "increased. Flink framework and its dependencies also consume the direct memory, mostly for network " +
+ "communication. The most of network memory is managed by Flink and should not result in out-of-memory " +
+ "error. In certain special cases, in particular for jobs with high parallelism, the framework may " +
+ "require more direct memory which is not managed by Flink. In this case '%s' configuration option " +
+ "should be increased. If the error persists then there is probably a direct memory leak which has to " +
+ "be investigated and fixed. The task executor has to be shutdown...",
+ TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key(),
+ TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key());
+
private static final String TM_METASPACE_OOM_ERROR_MESSAGE = String.format(
"Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires " +
"a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case " +
@@ -121,7 +134,7 @@ public final class ExceptionUtils {
/**
* Generates new {@link OutOfMemoryError} with more detailed message.
*
- * <p>This method improves error message for metaspace {@link OutOfMemoryError}.
+ * <p>This method improves error message for direct and metaspace {@link OutOfMemoryError}.
* It adds description of possible causes and ways of resolution.
*
* @param exception The exception to enrich.
@@ -130,6 +143,8 @@ public final class ExceptionUtils {
public static Throwable enrichTaskManagerOutOfMemoryError(Throwable exception) {
if (isMetaspaceOutOfMemoryError(exception)) {
return changeOutOfMemoryErrorMessage(exception, TM_METASPACE_OOM_ERROR_MESSAGE);
+ } else if (isDirectOutOfMemoryError(exception)) {
+ return changeOutOfMemoryErrorMessage(exception, TM_DIRECT_OOM_ERROR_MESSAGE);
}
return exception;
}
@@ -155,6 +170,16 @@ public final class ExceptionUtils {
return isOutOfMemoryErrorWithMessageStartingWith(t, "Metaspace");
}
+ /**
+ * Checks whether the given exception indicates a JVM direct out-of-memory error.
+ *
+ * @param t The exception to check.
+ * @return True, if the exception is the direct {@link OutOfMemoryError}, false otherwise.
+ */
+ public static boolean isDirectOutOfMemoryError(Throwable t) {
+ return isOutOfMemoryErrorWithMessageStartingWith(t, "Direct buffer memory");
+ }
+
private static boolean isOutOfMemoryErrorWithMessageStartingWith(Throwable t, String prefix) {
// the exact matching of the class is checked to avoid matching any custom subclasses of OutOfMemoryError
// as we are interested in the original exceptions, generated by JVM.