You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/03/24 07:47:41 UTC

[flink] 02/04: [FLINK-16225] Add JVM Metaspace Error assumption test

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cdf4d6433ee7433553d7b516666018838e508c7c
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Mar 19 11:46:46 2020 +0300

    [FLINK-16225] Add JVM Metaspace Error assumption test
---
 .../apache/flink/test/util/TestProcessBuilder.java |  21 +++-
 .../flink/runtime/util/ExceptionUtilsITCases.java  | 121 +++++++++++++++++++++
 ...tractTaskManagerProcessFailureRecoveryTest.java |   2 +-
 .../recovery/ProcessFailureCancelingITCase.java    |   4 +-
 4 files changed, 141 insertions(+), 7 deletions(-)

diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java
index aad7845..d479956 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java
@@ -70,10 +70,12 @@ public class TestProcessBuilder {
 		commands.addAll(mainClassArgs);
 
 		StringWriter processOutput = new StringWriter();
+		StringWriter errorOutput = new StringWriter();
 		Process process = new ProcessBuilder(commands).start();
-		new PipeForwarder(process.getErrorStream(), processOutput);
+		new PipeForwarder(process.getInputStream(), processOutput);
+		new PipeForwarder(process.getErrorStream(), errorOutput);
 
-		return new TestProcess(process, processOutput);
+		return new TestProcess(process, processOutput, errorOutput);
 	}
 
 	public TestProcessBuilder setJvmMemory(MemorySize jvmMemory) {
@@ -81,6 +83,11 @@ public class TestProcessBuilder {
 		return this;
 	}
 
+	public TestProcessBuilder addJvmArg(String arg) {
+		jvmArgs.add(arg);
+		return this;
+	}
+
 	public TestProcessBuilder addMainClassArg(String arg) {
 		mainClassArgs.add(arg);
 		return this;
@@ -100,20 +107,26 @@ public class TestProcessBuilder {
 	public static class TestProcess {
 		private final Process process;
 		private final StringWriter processOutput;
+		private final StringWriter errorOutput;
 
-		public TestProcess(Process process, StringWriter processOutput) {
+		public TestProcess(Process process, StringWriter processOutput, StringWriter errorOutput) {
 			this.process = process;
 			this.processOutput = processOutput;
+			this.errorOutput = errorOutput;
 		}
 
 		public Process getProcess() {
 			return process;
 		}
 
-		public StringWriter getOutput() {
+		public StringWriter getProcessOutput() {
 			return processOutput;
 		}
 
+		public StringWriter getErrorOutput() {
+			return errorOutput;
+		}
+
 		public void destroy() {
 			process.destroy();
 		}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java
new file mode 100644
index 0000000..045babb
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.test.util.TestProcessBuilder;
+import org.apache.flink.test.util.TestProcessBuilder.TestProcess;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.testutils.ClassLoaderUtils.ClassLoaderBuilder;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link  ExceptionUtils} which require to spawn JVM process and set JVM memory args.
+ */
+public class ExceptionUtilsITCases extends TestLogger {
+	private static final long INITIAL_BIG_METASPACE_SIZE = 32 * (1 << 20); // 32Mb
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void testIsMetaspaceOutOfMemoryError() throws IOException, InterruptedException {
+		// load only one class and record required Metaspace
+		long okMetaspace = Long.parseLong(run(1, INITIAL_BIG_METASPACE_SIZE));
+		// load more classes to cause 'OutOfMemoryError: Metaspace'
+		assertThat(run(1000, okMetaspace), is(""));
+	}
+
+	private static String run(int numberOfLoadedClasses, long metaspaceSize) throws InterruptedException, IOException {
+		TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(DummyClassLoadingProgram.class.getName());
+		taskManagerProcessBuilder.addJvmArg("-XX:-UseCompressedOops");
+		taskManagerProcessBuilder.addJvmArg(String.format("-XX:MaxMetaspaceSize=%d", metaspaceSize));
+		taskManagerProcessBuilder.addMainClassArg(Integer.toString(numberOfLoadedClasses));
+		taskManagerProcessBuilder.addMainClassArg(TEMPORARY_FOLDER.getRoot().getAbsolutePath());
+		TestProcess p = taskManagerProcessBuilder.start();
+		p.getProcess().waitFor();
+		assertThat(p.getErrorOutput().toString().trim(), is(""));
+		return p.getProcessOutput().toString().trim();
+	}
+
+	/**
+	 * Dummy java program to generate Metaspace OOM.
+	 */
+	public static class DummyClassLoadingProgram {
+		private DummyClassLoadingProgram() {
+		}
+
+		public static void main(String[] args) {
+			// trigger needed classes loaded
+			output("");
+			ExceptionUtils.isMetaspaceOutOfMemoryError(new Exception());
+
+			Collection<Class<?>> classes = new ArrayList<>();
+			int numberOfLoadedClasses = Integer.parseInt(args[0]);
+			try {
+				for (int index = 0; index < numberOfLoadedClasses; index++) {
+					classes.add(loadDummyClass(index, args[1]));
+				}
+				String out = classes.size() > 1 ? "Exception is not thrown, metaspace usage: " : "";
+				output(out + getMetaspaceUsage());
+			} catch (Throwable t) {
+				if (ExceptionUtils.isMetaspaceOutOfMemoryError(t)) {
+					return;
+				}
+				output("Wrong exception: " + t);
+			}
+		}
+
+		private static Class<?> loadDummyClass(int index, String folderToSaveSource) throws ClassNotFoundException, IOException {
+			String className = "DummyClass" + index;
+			String sourcePattern = "public class %s { @Override public String toString() { return \"%s\"; } }";
+			ClassLoaderBuilder classLoaderBuilder = ClassLoaderUtils.withRoot(new File(folderToSaveSource));
+			classLoaderBuilder.addClass(className, String.format(sourcePattern, className, "dummy"));
+			ClassLoader classLoader = classLoaderBuilder.build();
+			return Class.forName(className, true, classLoader);
+		}
+
+		private static long getMetaspaceUsage() {
+			for (MemoryPoolMXBean memoryMXBean : ManagementFactory.getMemoryPoolMXBeans()) {
+				if ("Metaspace".equals(memoryMXBean.getName())) {
+					return memoryMXBean.getUsage().getUsed();
+				}
+			}
+			throw new RuntimeException("Metaspace usage is not found");
+		}
+
+		private static void output(String text) {
+			System.out.println(text);
+		}
+	}
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index f69545b..d9c8230 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -258,7 +258,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 			System.out.println("-----------------------------------------");
 			System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
 			System.out.println("-----------------------------------------");
-			System.out.println(process.getOutput().toString());
+			System.out.println(process.getErrorOutput().toString());
 			System.out.println("-----------------------------------------");
 			System.out.println("		END SPAWNED PROCESS LOG");
 			System.out.println("-----------------------------------------");
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 966b2fe..9d18395 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -237,11 +237,11 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 			// all seems well :-)
 		}
 		catch (Exception e) {
-			printProcessLog("TaskManager", taskManagerProcess.getOutput().toString());
+			printProcessLog("TaskManager", taskManagerProcess.getErrorOutput().toString());
 			throw e;
 		}
 		catch (Error e) {
-			printProcessLog("TaskManager 1", taskManagerProcess.getOutput().toString());
+			printProcessLog("TaskManager 1", taskManagerProcess.getErrorOutput().toString());
 			throw e;
 		}
 		finally {