You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/13 09:00:14 UTC

[flink] 02/02: [FLINK-18815] Close safety net guarded closeable iff it is still registered

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3549001b13bc14f170cad4b1427fd04ae9a93bcf
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Wed Aug 12 11:35:05 2020 +0800

    [FLINK-18815] Close safety net guarded closeable iff it is still registered
    
    This closes #13124.
---
 .../org/apache/flink/core/fs/SafetyNetCloseableRegistry.java   | 10 +++++++---
 .../java/org/apache/flink/util/AbstractCloseableRegistry.java  |  4 ++--
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index c22df03..8b2ed4c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -175,8 +175,13 @@ public class SafetyNetCloseableRegistry extends
 
 		@Override
 		public void close() throws IOException {
-			closeableRegistry.removeCloseableInternal(innerCloseable);
-			innerCloseable.close();
+			// Mark sure the inner closeable is still registered and thus unclosed to
+			// prevent duplicated and concurrent closing from registry closing. This could
+			// happen if registry is closing after this phantom reference was enqueued.
+			if (closeableRegistry.removeCloseableInternal(innerCloseable)) {
+				LOG.warn("Closing unclosed resource via safety-net: {}", getDebugString());
+				innerCloseable.close();
+			}
 		}
 	}
 
@@ -205,7 +210,6 @@ public class SafetyNetCloseableRegistry extends
 
 					if (toClose != null) {
 						try {
-							LOG.warn("Closing unclosed resource via safety-net: {}", toClose.getDebugString());
 							toClose.close();
 						}
 						catch (Throwable t) {
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index ff4febd..53518ec 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -163,9 +163,9 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 	/**
 	 * Removes a mapping from the registry map, respecting locking.
 	 */
-	protected final void removeCloseableInternal(Closeable closeable) {
+	protected final boolean removeCloseableInternal(Closeable closeable) {
 		synchronized (getSynchronizationLock()) {
-			closeableToRef.remove(closeable);
+			return closeableToRef.remove(closeable) != null;
 		}
 	}