You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/03/24 07:47:39 UTC

[flink] branch release-1.10 updated (6c5621e -> 3877924)

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6c5621e  [FLINK-16567][table][doc] Update to TableConfig in query_configuration document
     new 2c934ff  [FLINK-16225] Improve metaspace out-of-memory error handling thrown in user code
     new cdf4d64  [FLINK-16225] Add JVM Metaspace Error assumption test
     new 25ec89b  [FLINK-15989] Improve direct out-of-memory error handling in MemorySegmentFactory
     new 3877924  [FLINK-15989] Add JVM Direct Error assumption test

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/memory/MemorySegmentFactory.java    |  25 ++-
 .../java/org/apache/flink/util/ExceptionUtils.java |  77 +++++++++
 .../runtime/taskexecutor/TaskManagerRunner.java    |   8 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |   2 +
 .../apache/flink/test/util/TestProcessBuilder.java |  21 ++-
 .../flink/runtime/util/ExceptionUtilsITCases.java  | 175 +++++++++++++++++++++
 ...tractTaskManagerProcessFailureRecoveryTest.java |   2 +-
 .../recovery/ProcessFailureCancelingITCase.java    |   4 +-
 8 files changed, 304 insertions(+), 10 deletions(-)
 create mode 100644 flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java


[flink] 01/04: [FLINK-16225] Improve metaspace out-of-memory error handling thrown in user code

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c934ff2a42fe256c2a1174788cbe55c05e8e323
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Sun Mar 15 14:26:05 2020 +0300

    [FLINK-16225] Improve metaspace out-of-memory error handling thrown in user code
    
    Improve error message, explaining the possible reasons and ways to resolve.
    In case of metaspace OOM error, try a graceful TM shutdown.
    
    This closes #11408.
---
 .../java/org/apache/flink/util/ExceptionUtils.java | 52 ++++++++++++++++++++++
 .../runtime/taskexecutor/TaskManagerRunner.java    |  8 +++-
 .../org/apache/flink/runtime/taskmanager/Task.java |  2 +
 3 files changed, 60 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index ddd0276..5fc1bfe 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -25,6 +25,7 @@
 package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import javax.annotation.Nullable;
@@ -48,6 +49,14 @@ public final class ExceptionUtils {
 	/** The stringified representation of a null exception reference. */
 	public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
 
+	private static final String TM_METASPACE_OOM_ERROR_MESSAGE = String.format(
+		"Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires " +
+			"a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case " +
+			"'%s' configuration option should be increased. If the error persists (usually in cluster after " +
+			"several job (re-)submissions) then there is probably a class loading leak which has to be " +
+			"investigated and fixed. The task executor has to be shutdown...",
+		TaskManagerOptions.JVM_METASPACE.key());
+
 	/**
 	 * Makes a string representation of the exception's stack trace, or "(null)", if the
 	 * exception is null.
@@ -110,6 +119,49 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Generates new {@link OutOfMemoryError} with more detailed message.
+	 *
+	 * <p>This method improves error message for metaspace {@link OutOfMemoryError}.
+	 * It adds description of possible causes and ways of resolution.
+	 *
+	 * @param exception The exception to enrich.
+	 * @return either enriched exception if needed or the original one.
+	 */
+	public static Throwable enrichTaskManagerOutOfMemoryError(Throwable exception) {
+		if (isMetaspaceOutOfMemoryError(exception)) {
+			return changeOutOfMemoryErrorMessage(exception, TM_METASPACE_OOM_ERROR_MESSAGE);
+		}
+		return exception;
+	}
+
+	private static OutOfMemoryError changeOutOfMemoryErrorMessage(Throwable exception, String newMessage) {
+		Preconditions.checkArgument(exception instanceof OutOfMemoryError);
+		if (exception.getMessage().equals(newMessage)) {
+			return (OutOfMemoryError) exception;
+		}
+		OutOfMemoryError newError = new OutOfMemoryError(newMessage);
+		newError.initCause(exception.getCause());
+		newError.setStackTrace(exception.getStackTrace());
+		return newError;
+	}
+
+	/**
+	 * Checks whether the given exception indicates a JVM metaspace out-of-memory error.
+	 *
+	 * @param t The exception to check.
+	 * @return True, if the exception is the metaspace {@link OutOfMemoryError}, false otherwise.
+	 */
+	public static boolean isMetaspaceOutOfMemoryError(Throwable t) {
+		return isOutOfMemoryErrorWithMessageStartingWith(t, "Metaspace");
+	}
+
+	private static boolean isOutOfMemoryErrorWithMessageStartingWith(Throwable t, String prefix) {
+		// the exact matching of the class is checked to avoid matching any custom subclasses of OutOfMemoryError
+		// as we are interested in the original exceptions, generated by JVM.
+		return t.getClass() == OutOfMemoryError.class && t.getMessage() != null && t.getMessage().startsWith(prefix);
+	}
+
+	/**
 	 * Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM.
 	 * See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a definition of fatal errors.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index ca93e32..8ed4fe4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -246,9 +246,13 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 
 	@Override
 	public void onFatalError(Throwable exception) {
-		LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception);
+		Throwable enrichedException = ExceptionUtils.enrichTaskManagerOutOfMemoryError(exception);
+		LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", enrichedException);
 
-		if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(exception)) {
+		// In case of the Metaspace OutOfMemoryError, we expect that the graceful shutdown is possible,
+		// as it does not usually require more class loading to fail again with the Metaspace OutOfMemoryError.
+		if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(enrichedException) &&
+				!ExceptionUtils.isMetaspaceOutOfMemoryError(enrichedException)) {
 			terminateJVM();
 		} else {
 			closeAsync();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3723f1a..689ada8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -741,6 +741,8 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 			// an exception was thrown as a side effect of cancelling
 			// ----------------------------------------------------------------
 
+			t = ExceptionUtils.enrichTaskManagerOutOfMemoryError(t);
+
 			try {
 				// check if the exception is unrecoverable
 				if (ExceptionUtils.isJvmFatalError(t) ||


[flink] 02/04: [FLINK-16225] Add JVM Metaspace Error assumption test

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cdf4d6433ee7433553d7b516666018838e508c7c
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Mar 19 11:46:46 2020 +0300

    [FLINK-16225] Add JVM Metaspace Error assumption test
---
 .../apache/flink/test/util/TestProcessBuilder.java |  21 +++-
 .../flink/runtime/util/ExceptionUtilsITCases.java  | 121 +++++++++++++++++++++
 ...tractTaskManagerProcessFailureRecoveryTest.java |   2 +-
 .../recovery/ProcessFailureCancelingITCase.java    |   4 +-
 4 files changed, 141 insertions(+), 7 deletions(-)

diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java
index aad7845..d479956 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java
@@ -70,10 +70,12 @@ public class TestProcessBuilder {
 		commands.addAll(mainClassArgs);
 
 		StringWriter processOutput = new StringWriter();
+		StringWriter errorOutput = new StringWriter();
 		Process process = new ProcessBuilder(commands).start();
-		new PipeForwarder(process.getErrorStream(), processOutput);
+		new PipeForwarder(process.getInputStream(), processOutput);
+		new PipeForwarder(process.getErrorStream(), errorOutput);
 
-		return new TestProcess(process, processOutput);
+		return new TestProcess(process, processOutput, errorOutput);
 	}
 
 	public TestProcessBuilder setJvmMemory(MemorySize jvmMemory) {
@@ -81,6 +83,11 @@ public class TestProcessBuilder {
 		return this;
 	}
 
+	public TestProcessBuilder addJvmArg(String arg) {
+		jvmArgs.add(arg);
+		return this;
+	}
+
 	public TestProcessBuilder addMainClassArg(String arg) {
 		mainClassArgs.add(arg);
 		return this;
@@ -100,20 +107,26 @@ public class TestProcessBuilder {
 	public static class TestProcess {
 		private final Process process;
 		private final StringWriter processOutput;
+		private final StringWriter errorOutput;
 
-		public TestProcess(Process process, StringWriter processOutput) {
+		public TestProcess(Process process, StringWriter processOutput, StringWriter errorOutput) {
 			this.process = process;
 			this.processOutput = processOutput;
+			this.errorOutput = errorOutput;
 		}
 
 		public Process getProcess() {
 			return process;
 		}
 
-		public StringWriter getOutput() {
+		public StringWriter getProcessOutput() {
 			return processOutput;
 		}
 
+		public StringWriter getErrorOutput() {
+			return errorOutput;
+		}
+
 		public void destroy() {
 			process.destroy();
 		}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java
new file mode 100644
index 0000000..045babb
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.test.util.TestProcessBuilder;
+import org.apache.flink.test.util.TestProcessBuilder.TestProcess;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.testutils.ClassLoaderUtils.ClassLoaderBuilder;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link  ExceptionUtils} which require to spawn JVM process and set JVM memory args.
+ */
+public class ExceptionUtilsITCases extends TestLogger {
+	private static final long INITIAL_BIG_METASPACE_SIZE = 32 * (1 << 20); // 32Mb
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void testIsMetaspaceOutOfMemoryError() throws IOException, InterruptedException {
+		// load only one class and record required Metaspace
+		long okMetaspace = Long.parseLong(run(1, INITIAL_BIG_METASPACE_SIZE));
+		// load more classes to cause 'OutOfMemoryError: Metaspace'
+		assertThat(run(1000, okMetaspace), is(""));
+	}
+
+	private static String run(int numberOfLoadedClasses, long metaspaceSize) throws InterruptedException, IOException {
+		TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(DummyClassLoadingProgram.class.getName());
+		taskManagerProcessBuilder.addJvmArg("-XX:-UseCompressedOops");
+		taskManagerProcessBuilder.addJvmArg(String.format("-XX:MaxMetaspaceSize=%d", metaspaceSize));
+		taskManagerProcessBuilder.addMainClassArg(Integer.toString(numberOfLoadedClasses));
+		taskManagerProcessBuilder.addMainClassArg(TEMPORARY_FOLDER.getRoot().getAbsolutePath());
+		TestProcess p = taskManagerProcessBuilder.start();
+		p.getProcess().waitFor();
+		assertThat(p.getErrorOutput().toString().trim(), is(""));
+		return p.getProcessOutput().toString().trim();
+	}
+
+	/**
+	 * Dummy java program to generate Metaspace OOM.
+	 */
+	public static class DummyClassLoadingProgram {
+		private DummyClassLoadingProgram() {
+		}
+
+		public static void main(String[] args) {
+			// trigger needed classes loaded
+			output("");
+			ExceptionUtils.isMetaspaceOutOfMemoryError(new Exception());
+
+			Collection<Class<?>> classes = new ArrayList<>();
+			int numberOfLoadedClasses = Integer.parseInt(args[0]);
+			try {
+				for (int index = 0; index < numberOfLoadedClasses; index++) {
+					classes.add(loadDummyClass(index, args[1]));
+				}
+				String out = classes.size() > 1 ? "Exception is not thrown, metaspace usage: " : "";
+				output(out + getMetaspaceUsage());
+			} catch (Throwable t) {
+				if (ExceptionUtils.isMetaspaceOutOfMemoryError(t)) {
+					return;
+				}
+				output("Wrong exception: " + t);
+			}
+		}
+
+		private static Class<?> loadDummyClass(int index, String folderToSaveSource) throws ClassNotFoundException, IOException {
+			String className = "DummyClass" + index;
+			String sourcePattern = "public class %s { @Override public String toString() { return \"%s\"; } }";
+			ClassLoaderBuilder classLoaderBuilder = ClassLoaderUtils.withRoot(new File(folderToSaveSource));
+			classLoaderBuilder.addClass(className, String.format(sourcePattern, className, "dummy"));
+			ClassLoader classLoader = classLoaderBuilder.build();
+			return Class.forName(className, true, classLoader);
+		}
+
+		private static long getMetaspaceUsage() {
+			for (MemoryPoolMXBean memoryMXBean : ManagementFactory.getMemoryPoolMXBeans()) {
+				if ("Metaspace".equals(memoryMXBean.getName())) {
+					return memoryMXBean.getUsage().getUsed();
+				}
+			}
+			throw new RuntimeException("Metaspace usage is not found");
+		}
+
+		private static void output(String text) {
+			System.out.println(text);
+		}
+	}
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index f69545b..d9c8230 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -258,7 +258,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			System.out.println("-----------------------------------------");
 			System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
 			System.out.println("-----------------------------------------");
-			System.out.println(process.getOutput().toString());
+			System.out.println(process.getErrorOutput().toString());
 			System.out.println("-----------------------------------------");
 			System.out.println("		END SPAWNED PROCESS LOG");
 			System.out.println("-----------------------------------------");
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 966b2fe..9d18395 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -237,11 +237,11 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			// all seems well :-)
 		}
 		catch (Exception e) {
-			printProcessLog("TaskManager", taskManagerProcess.getOutput().toString());
+			printProcessLog("TaskManager", taskManagerProcess.getErrorOutput().toString());
 			throw e;
 		}
 		catch (Error e) {
-			printProcessLog("TaskManager 1", taskManagerProcess.getOutput().toString());
+			printProcessLog("TaskManager 1", taskManagerProcess.getErrorOutput().toString());
 			throw e;
 		}
 		finally {


[flink] 04/04: [FLINK-15989] Add JVM Direct Error assumption test

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 387792408b95572f3de9be3b56775e33790822e8
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Mar 19 12:33:57 2020 +0300

    [FLINK-15989] Add JVM Direct Error assumption test
---
 .../flink/runtime/util/ExceptionUtilsITCases.java  | 78 ++++++++++++++++++----
 1 file changed, 66 insertions(+), 12 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java
index 045babb..fd42f59 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java
@@ -33,8 +33,11 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryPoolMXBean;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
@@ -43,31 +46,82 @@ import static org.junit.Assert.assertThat;
  * Tests for {@link  ExceptionUtils} which require to spawn JVM process and set JVM memory args.
  */
 public class ExceptionUtilsITCases extends TestLogger {
-	private static final long INITIAL_BIG_METASPACE_SIZE = 32 * (1 << 20); // 32Mb
+	private static final int DIRECT_MEMORY_SIZE = 10 * 1024; // 10Kb
+	private static final int DIRECT_MEMORY_ALLOCATION_PAGE_SIZE = 1024; // 1Kb
+	private static final int DIRECT_MEMORY_PAGE_NUMBER = DIRECT_MEMORY_SIZE / DIRECT_MEMORY_ALLOCATION_PAGE_SIZE;
+	private static final long INITIAL_BIG_METASPACE_SIZE = 128 * (1 << 20); // 128Mb
 
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
 	@Test
+	public void testIsDirectOutOfMemoryError() throws IOException, InterruptedException {
+		String className = DummyDirectAllocatingProgram.class.getName();
+		String out = run(className, Collections.emptyList(), DIRECT_MEMORY_SIZE, -1);
+		assertThat(out, is(""));
+	}
+
+	@Test
 	public void testIsMetaspaceOutOfMemoryError() throws IOException, InterruptedException {
+		String className = DummyClassLoadingProgram.class.getName();
 		// load only one class and record required Metaspace
-		long okMetaspace = Long.parseLong(run(1, INITIAL_BIG_METASPACE_SIZE));
+		String normalOut = run(className, getDummyClassLoadingProgramArgs(1), -1, INITIAL_BIG_METASPACE_SIZE);
+		long okMetaspace = Long.parseLong(normalOut);
 		// load more classes to cause 'OutOfMemoryError: Metaspace'
-		assertThat(run(1000, okMetaspace), is(""));
+		String oomOut = run(className, getDummyClassLoadingProgramArgs(1000), -1, okMetaspace);
+		assertThat(oomOut, is(""));
 	}
 
-	private static String run(int numberOfLoadedClasses, long metaspaceSize) throws InterruptedException, IOException {
-		TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(DummyClassLoadingProgram.class.getName());
-		taskManagerProcessBuilder.addJvmArg("-XX:-UseCompressedOops");
-		taskManagerProcessBuilder.addJvmArg(String.format("-XX:MaxMetaspaceSize=%d", metaspaceSize));
-		taskManagerProcessBuilder.addMainClassArg(Integer.toString(numberOfLoadedClasses));
-		taskManagerProcessBuilder.addMainClassArg(TEMPORARY_FOLDER.getRoot().getAbsolutePath());
+	private static String run(
+			String className,
+			Iterable<String> args,
+			long directMemorySize,
+			long metaspaceSize) throws InterruptedException, IOException {
+		TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(className);
+		if (directMemorySize > 0) {
+			taskManagerProcessBuilder.addJvmArg(String.format("-XX:MaxDirectMemorySize=%d", directMemorySize));
+		}
+		if (metaspaceSize > 0) {
+			taskManagerProcessBuilder.addJvmArg("-XX:-UseCompressedOops");
+			taskManagerProcessBuilder.addJvmArg(String.format("-XX:MaxMetaspaceSize=%d", metaspaceSize));
+		}
+		for (String arg : args) {
+			taskManagerProcessBuilder.addMainClassArg(arg);
+		}
 		TestProcess p = taskManagerProcessBuilder.start();
 		p.getProcess().waitFor();
 		assertThat(p.getErrorOutput().toString().trim(), is(""));
 		return p.getProcessOutput().toString().trim();
 	}
 
+	private static Collection<String> getDummyClassLoadingProgramArgs(int numberOfLoadedClasses) {
+		return Arrays.asList(
+			Integer.toString(numberOfLoadedClasses),
+			TEMPORARY_FOLDER.getRoot().getAbsolutePath());
+	}
+
+	/**
+	 * Dummy java program to generate Direct OOM.
+	 */
+	public static class DummyDirectAllocatingProgram {
+		private DummyDirectAllocatingProgram() {
+		}
+
+		public static void main(String[] args) {
+			try {
+				Collection<ByteBuffer> buffers = new ArrayList<>();
+				for (int page = 0; page < 2 * DIRECT_MEMORY_PAGE_NUMBER; page++) {
+					buffers.add(ByteBuffer.allocateDirect(DIRECT_MEMORY_ALLOCATION_PAGE_SIZE));
+				}
+				output("buffers: " + buffers);
+			} catch (Throwable t) {
+				if (!ExceptionUtils.isDirectOutOfMemoryError(t)) {
+					output("Wrong exception: " + t);
+				}
+			}
+		}
+	}
+
 	/**
 	 * Dummy java program to generate Metaspace OOM.
 	 */
@@ -113,9 +167,9 @@ public class ExceptionUtilsITCases extends TestLogger {
 			}
 			throw new RuntimeException("Metaspace usage is not found");
 		}
+	}
 
-		private static void output(String text) {
-			System.out.println(text);
-		}
+	private static void output(String text) {
+		System.out.println(text);
 	}
 }


[flink] 03/04: [FLINK-15989] Improve direct out-of-memory error handling in MemorySegmentFactory

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 25ec89b8dae175246fd28972f8dbf7d479e89b5f
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Mar 19 16:16:58 2020 +0300

    [FLINK-15989] Improve direct out-of-memory error handling in MemorySegmentFactory
---
 .../flink/core/memory/MemorySegmentFactory.java    | 25 +++++++++++++++++++-
 .../java/org/apache/flink/util/ExceptionUtils.java | 27 +++++++++++++++++++++-
 2 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index c297a26..760d2ac 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -19,6 +19,10 @@
 package org.apache.flink.core.memory;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 
@@ -32,6 +36,7 @@ import java.nio.ByteBuffer;
  */
 @Internal
 public final class MemorySegmentFactory {
+	private static final Logger LOG = LoggerFactory.getLogger(MemorySegmentFactory.class);
 
 	/**
 	 * Creates a new memory segment that targets the given heap memory region.
@@ -94,10 +99,28 @@ public final class MemorySegmentFactory {
 	 * @return A new memory segment, backed by unpooled off-heap memory.
 	 */
 	public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
-		ByteBuffer memory = ByteBuffer.allocateDirect(size);
+		ByteBuffer memory = allocateDirectMemory(size);
 		return new HybridMemorySegment(memory, owner, null);
 	}
 
+	private static ByteBuffer allocateDirectMemory(int size) {
+		//noinspection ErrorNotRethrown
+		try {
+			return ByteBuffer.allocateDirect(size);
+		} catch (OutOfMemoryError outOfMemoryError) {
+			// TODO: this error handling can be removed in future,
+			// once we find a common way to handle OOM errors in netty threads.
+			// Here we enrich it to propagate better OOM message to the receiver
+			// if it happens in a netty thread.
+			OutOfMemoryError enrichedOutOfMemoryError = (OutOfMemoryError) ExceptionUtils
+				.enrichTaskManagerOutOfMemoryError(outOfMemoryError);
+			if (ExceptionUtils.isDirectOutOfMemoryError(outOfMemoryError)) {
+				LOG.error("Cannot allocate direct memory segment", enrichedOutOfMemoryError);
+			}
+			throw enrichedOutOfMemoryError;
+		}
+	}
+
 	/**
 	 * Allocates an off-heap unsafe memory and creates a new memory segment to represent that memory.
 	 *
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 5fc1bfe..0a63ae5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -49,6 +49,19 @@ public final class ExceptionUtils {
 	/** The stringified representation of a null exception reference. */
 	public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
 
+	private static final String TM_DIRECT_OOM_ERROR_MESSAGE = String.format(
+		"Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) " +
+			"a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be " +
+			"allocated by user code or some of its dependencies. In this case '%s' configuration option should be " +
+			"increased. Flink framework and its dependencies also consume the direct memory, mostly for network " +
+			"communication. The most of network memory is managed by Flink and should not result in out-of-memory " +
+			"error. In certain special cases, in particular for jobs with high parallelism, the framework may " +
+			"require more direct memory which is not managed by Flink. In this case '%s' configuration option " +
+			"should be increased. If the error persists then there is probably a direct memory leak which has to " +
+			"be investigated and fixed. The task executor has to be shutdown...",
+		TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key(),
+		TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key());
+
 	private static final String TM_METASPACE_OOM_ERROR_MESSAGE = String.format(
 		"Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires " +
 			"a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case " +
@@ -121,7 +134,7 @@ public final class ExceptionUtils {
 	/**
 	 * Generates new {@link OutOfMemoryError} with more detailed message.
 	 *
-	 * <p>This method improves error message for metaspace {@link OutOfMemoryError}.
+	 * <p>This method improves error message for direct and metaspace {@link OutOfMemoryError}.
 	 * It adds description of possible causes and ways of resolution.
 	 *
 	 * @param exception The exception to enrich.
@@ -130,6 +143,8 @@ public final class ExceptionUtils {
 	public static Throwable enrichTaskManagerOutOfMemoryError(Throwable exception) {
 		if (isMetaspaceOutOfMemoryError(exception)) {
 			return changeOutOfMemoryErrorMessage(exception, TM_METASPACE_OOM_ERROR_MESSAGE);
+		} else if (isDirectOutOfMemoryError(exception)) {
+			return changeOutOfMemoryErrorMessage(exception, TM_DIRECT_OOM_ERROR_MESSAGE);
 		}
 		return exception;
 	}
@@ -155,6 +170,16 @@ public final class ExceptionUtils {
 		return isOutOfMemoryErrorWithMessageStartingWith(t, "Metaspace");
 	}
 
+	/**
+	 * Checks whether the given exception indicates a JVM direct out-of-memory error.
+	 *
+	 * @param t The exception to check.
+	 * @return True, if the exception is the direct {@link OutOfMemoryError}, false otherwise.
+	 */
+	public static boolean isDirectOutOfMemoryError(Throwable t) {
+		return isOutOfMemoryErrorWithMessageStartingWith(t, "Direct buffer memory");
+	}
+
 	private static boolean isOutOfMemoryErrorWithMessageStartingWith(Throwable t, String prefix) {
 		// the exact matching of the class is checked to avoid matching any custom subclasses of OutOfMemoryError
 		// as we are interested in the original exceptions, generated by JVM.