You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/07/28 11:25:16 UTC

[flink] branch master updated: [FLINK-18581] Do not try to run GC phantom cleaners for jdk < 8u72

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f03841  [FLINK-18581] Do not try to run GC phantom cleaners for jdk < 8u72
2f03841 is described below

commit 2f03841d5414f9d4a4b810810317c0250065264e
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Jul 22 10:34:49 2020 +0300

    [FLINK-18581] Do not try to run GC phantom cleaners for jdk < 8u72
    
    The private JVM method Reference#tryHandlePending was introduced at Java 8u72.
    The explicit processing of queued phantom GC cleaners was exposed before 8u72, also is was not used while reserving JVM direct memory.
    Therefore, we can only hope that the GC will be triggered and the cleaners get processed in GC after some timeout.
    This is suboptimal, therefore the PR changes Flink to not fail if the method is unavailable but logs a warning to upgrade Java.
    
    This closes #12981.
---
 .../apache/flink/util/JavaGcCleanerWrapper.java    | 33 +++++++++++++---------
 .../flink/runtime/memory/UnsafeMemoryBudget.java   |  9 ++++--
 .../flink/runtime/taskexecutor/slot/TaskSlot.java  |  6 +++-
 3 files changed, 31 insertions(+), 17 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java b/flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
index d02c9c5..1559434 100644
--- a/flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
@@ -83,7 +83,6 @@ public enum JavaGcCleanerWrapper {
 				"clean"),
 			new PendingCleanersRunnerProvider(
 				name,
-				reflectionUtils,
 				"tryHandlePending",
 				LEGACY_WAIT_FOR_REFERENCE_PROCESSING_ARGS,
 				LEGACY_WAIT_FOR_REFERENCE_PROCESSING_ARG_TYPES));
@@ -126,7 +125,6 @@ public enum JavaGcCleanerWrapper {
 				"clean"),
 			new PendingCleanersRunnerProvider(
 				name,
-				reflectionUtils,
 				"waitForReferenceProcessing",
 				JAVA9_WAIT_FOR_REFERENCE_PROCESSING_ARGS,
 				JAVA9_WAIT_FOR_REFERENCE_PROCESSING_ARG_TYPES));
@@ -189,12 +187,13 @@ public enum JavaGcCleanerWrapper {
 	private static class CleanerManager {
 		private final String cleanerName;
 		private final CleanerFactory cleanerFactory;
+		@Nullable
 		private final PendingCleanersRunner pendingCleanersRunner;
 
 		private CleanerManager(
 				String cleanerName,
 				CleanerFactory cleanerFactory,
-				PendingCleanersRunner pendingCleanersRunner) {
+				@Nullable PendingCleanersRunner pendingCleanersRunner) {
 			this.cleanerName = cleanerName;
 			this.cleanerFactory = cleanerFactory;
 			this.pendingCleanersRunner = pendingCleanersRunner;
@@ -205,7 +204,7 @@ public enum JavaGcCleanerWrapper {
 		}
 
 		private boolean tryRunPendingCleaners() throws InterruptedException {
-			return pendingCleanersRunner.tryRunPendingCleaners();
+			return pendingCleanersRunner != null && pendingCleanersRunner.tryRunPendingCleaners();
 		}
 
 		@Override
@@ -303,32 +302,38 @@ public enum JavaGcCleanerWrapper {
 	private static class PendingCleanersRunnerProvider {
 		private static final String REFERENCE_CLASS = "java.lang.ref.Reference";
 		private final String cleanerName;
-		private final ReflectionUtils reflectionUtils;
 		private final String waitForReferenceProcessingName;
 		private final Object[] waitForReferenceProcessingArgs;
 		private final Class<?>[] waitForReferenceProcessingArgTypes;
 
 		private PendingCleanersRunnerProvider(
 			String cleanerName,
-			ReflectionUtils reflectionUtils,
 			String waitForReferenceProcessingName,
 			Object[] waitForReferenceProcessingArgs,
 			Class<?>[] waitForReferenceProcessingArgTypes) {
 			this.cleanerName = cleanerName;
-			this.reflectionUtils = reflectionUtils;
 			this.waitForReferenceProcessingName = waitForReferenceProcessingName;
 			this.waitForReferenceProcessingArgs = waitForReferenceProcessingArgs;
 			this.waitForReferenceProcessingArgTypes = waitForReferenceProcessingArgTypes;
 		}
 
+		@Nullable
 		private PendingCleanersRunner createPendingCleanersRunner() {
-			Class<?> referenceClass = reflectionUtils.findClass(REFERENCE_CLASS);
-			Method waitForReferenceProcessingMethod = reflectionUtils.findMethod(
-				referenceClass,
-				waitForReferenceProcessingName,
-				waitForReferenceProcessingArgTypes);
-			waitForReferenceProcessingMethod.setAccessible(true);
-			return new PendingCleanersRunner(waitForReferenceProcessingMethod, waitForReferenceProcessingArgs);
+			try {
+				Class<?> referenceClass = Class.forName(REFERENCE_CLASS);
+				Method waitForReferenceProcessingMethod = referenceClass.getDeclaredMethod(
+					waitForReferenceProcessingName,
+					waitForReferenceProcessingArgTypes);
+				waitForReferenceProcessingMethod.setAccessible(true);
+				return new PendingCleanersRunner(waitForReferenceProcessingMethod, waitForReferenceProcessingArgs);
+			} catch (ClassNotFoundException | NoSuchMethodException e) {
+				LOG.warn(
+					"Cannot run pending GC phantom cleaners. " +
+						"This can result in suboptimal memory management or failures. " +
+						"Try to upgrade to Java 8u72 or higher.",
+					e);
+				return null;
+			}
 		}
 
 		@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
index 8063cd7..96888f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
@@ -158,8 +158,13 @@ class UnsafeMemoryBudget {
 			}
 
 			// no luck
-			throw new MemoryReservationException(
-				String.format("Could not allocate %d bytes, only %d bytes are remaining", size, availableOrReserved));
+			throw new MemoryReservationException(String.format(
+				"Could not allocate %d bytes, only %d bytes are remaining. This usually indicates " +
+					"that you are requesting more memory than you have reserved. " +
+					"However, when running an old JVM version it can also be caused by slow garbage collection. " +
+					"Try to upgrade to Java 8u72 or higher if running on an old Java version.",
+				size,
+				availableOrReserved));
 
 		} finally {
 			if (interrupted) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 82fc364..dda321f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -317,7 +317,11 @@ public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseableAsync {
 		after.thenRunAsync(
 			() -> {
 				if (!memoryManager.verifyEmpty()) {
-					LOG.warn("Not all slot memory is freed, potential memory leak at {}", this);
+					LOG.warn(
+						"Not all slot managed memory is freed at {}. This usually indicates memory leak. " +
+							"However, when running an old JVM version it can also be caused by slow garbage collection. " +
+							"Try to upgrade to Java 8u72 or higher if running on an old Java version.",
+						this);
 				}
 			},
 			asyncExecutor);