You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/03/24 07:47:41 UTC
[flink] 02/04: [FLINK-16225] Add JVM Metaspace Error assumption test
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cdf4d6433ee7433553d7b516666018838e508c7c
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Thu Mar 19 11:46:46 2020 +0300
[FLINK-16225] Add JVM Metaspace Error assumption test
---
.../apache/flink/test/util/TestProcessBuilder.java | 21 +++-
.../flink/runtime/util/ExceptionUtilsITCases.java | 121 +++++++++++++++++++++
...tractTaskManagerProcessFailureRecoveryTest.java | 2 +-
.../recovery/ProcessFailureCancelingITCase.java | 4 +-
4 files changed, 141 insertions(+), 7 deletions(-)
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java
index aad7845..d479956 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java
@@ -70,10 +70,12 @@ public class TestProcessBuilder {
commands.addAll(mainClassArgs);
StringWriter processOutput = new StringWriter();
+ StringWriter errorOutput = new StringWriter();
Process process = new ProcessBuilder(commands).start();
- new PipeForwarder(process.getErrorStream(), processOutput);
+ new PipeForwarder(process.getInputStream(), processOutput);
+ new PipeForwarder(process.getErrorStream(), errorOutput);
- return new TestProcess(process, processOutput);
+ return new TestProcess(process, processOutput, errorOutput);
}
public TestProcessBuilder setJvmMemory(MemorySize jvmMemory) {
@@ -81,6 +83,11 @@ public class TestProcessBuilder {
return this;
}
+ public TestProcessBuilder addJvmArg(String arg) {
+ jvmArgs.add(arg);
+ return this;
+ }
+
public TestProcessBuilder addMainClassArg(String arg) {
mainClassArgs.add(arg);
return this;
@@ -100,20 +107,26 @@ public class TestProcessBuilder {
public static class TestProcess {
private final Process process;
private final StringWriter processOutput;
+ private final StringWriter errorOutput;
- public TestProcess(Process process, StringWriter processOutput) {
+ public TestProcess(Process process, StringWriter processOutput, StringWriter errorOutput) {
this.process = process;
this.processOutput = processOutput;
+ this.errorOutput = errorOutput;
}
public Process getProcess() {
return process;
}
- public StringWriter getOutput() {
+ public StringWriter getProcessOutput() {
return processOutput;
}
+ public StringWriter getErrorOutput() {
+ return errorOutput;
+ }
+
public void destroy() {
process.destroy();
}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java
new file mode 100644
index 0000000..045babb
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.test.util.TestProcessBuilder;
+import org.apache.flink.test.util.TestProcessBuilder.TestProcess;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.testutils.ClassLoaderUtils.ClassLoaderBuilder;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ExceptionUtils} which require to spawn JVM process and set JVM memory args.
+ */
+public class ExceptionUtilsITCases extends TestLogger {
+ private static final long INITIAL_BIG_METASPACE_SIZE = 32 * (1 << 20); // 32Mb
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testIsMetaspaceOutOfMemoryError() throws IOException, InterruptedException {
+ // load only one class and record required Metaspace
+ long okMetaspace = Long.parseLong(run(1, INITIAL_BIG_METASPACE_SIZE));
+ // load more classes to cause 'OutOfMemoryError: Metaspace'
+ assertThat(run(1000, okMetaspace), is(""));
+ }
+
+ private static String run(int numberOfLoadedClasses, long metaspaceSize) throws InterruptedException, IOException {
+ TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(DummyClassLoadingProgram.class.getName());
+ taskManagerProcessBuilder.addJvmArg("-XX:-UseCompressedOops");
+ taskManagerProcessBuilder.addJvmArg(String.format("-XX:MaxMetaspaceSize=%d", metaspaceSize));
+ taskManagerProcessBuilder.addMainClassArg(Integer.toString(numberOfLoadedClasses));
+ taskManagerProcessBuilder.addMainClassArg(TEMPORARY_FOLDER.getRoot().getAbsolutePath());
+ TestProcess p = taskManagerProcessBuilder.start();
+ p.getProcess().waitFor();
+ assertThat(p.getErrorOutput().toString().trim(), is(""));
+ return p.getProcessOutput().toString().trim();
+ }
+
+ /**
+ * Dummy java program to generate Metaspace OOM.
+ */
+ public static class DummyClassLoadingProgram {
+ private DummyClassLoadingProgram() {
+ }
+
+ public static void main(String[] args) {
+ // trigger needed classes loaded
+ output("");
+ ExceptionUtils.isMetaspaceOutOfMemoryError(new Exception());
+
+ Collection<Class<?>> classes = new ArrayList<>();
+ int numberOfLoadedClasses = Integer.parseInt(args[0]);
+ try {
+ for (int index = 0; index < numberOfLoadedClasses; index++) {
+ classes.add(loadDummyClass(index, args[1]));
+ }
+ String out = classes.size() > 1 ? "Exception is not thrown, metaspace usage: " : "";
+ output(out + getMetaspaceUsage());
+ } catch (Throwable t) {
+ if (ExceptionUtils.isMetaspaceOutOfMemoryError(t)) {
+ return;
+ }
+ output("Wrong exception: " + t);
+ }
+ }
+
+ private static Class<?> loadDummyClass(int index, String folderToSaveSource) throws ClassNotFoundException, IOException {
+ String className = "DummyClass" + index;
+ String sourcePattern = "public class %s { @Override public String toString() { return \"%s\"; } }";
+ ClassLoaderBuilder classLoaderBuilder = ClassLoaderUtils.withRoot(new File(folderToSaveSource));
+ classLoaderBuilder.addClass(className, String.format(sourcePattern, className, "dummy"));
+ ClassLoader classLoader = classLoaderBuilder.build();
+ return Class.forName(className, true, classLoader);
+ }
+
+ private static long getMetaspaceUsage() {
+ for (MemoryPoolMXBean memoryMXBean : ManagementFactory.getMemoryPoolMXBeans()) {
+ if ("Metaspace".equals(memoryMXBean.getName())) {
+ return memoryMXBean.getUsage().getUsed();
+ }
+ }
+ throw new RuntimeException("Metaspace usage is not found");
+ }
+
+ private static void output(String text) {
+ System.out.println(text);
+ }
+ }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index f69545b..d9c8230 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -258,7 +258,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
System.out.println("-----------------------------------------");
System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
System.out.println("-----------------------------------------");
- System.out.println(process.getOutput().toString());
+ System.out.println(process.getErrorOutput().toString());
System.out.println("-----------------------------------------");
System.out.println(" END SPAWNED PROCESS LOG");
System.out.println("-----------------------------------------");
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 966b2fe..9d18395 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -237,11 +237,11 @@ public class ProcessFailureCancelingITCase extends TestLogger {
// all seems well :-)
}
catch (Exception e) {
- printProcessLog("TaskManager", taskManagerProcess.getOutput().toString());
+ printProcessLog("TaskManager", taskManagerProcess.getErrorOutput().toString());
throw e;
}
catch (Error e) {
- printProcessLog("TaskManager 1", taskManagerProcess.getOutput().toString());
+ printProcessLog("TaskManager 1", taskManagerProcess.getErrorOutput().toString());
throw e;
}
finally {