You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/13 09:00:14 UTC
[flink] 02/02: [FLINK-18815] Close safety net guarded closeable iff
it is still registered
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3549001b13bc14f170cad4b1427fd04ae9a93bcf
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Wed Aug 12 11:35:05 2020 +0800
[FLINK-18815] Close safety net guarded closeable iff it is still registered
This closes #13124.
---
.../org/apache/flink/core/fs/SafetyNetCloseableRegistry.java | 10 +++++++---
.../java/org/apache/flink/util/AbstractCloseableRegistry.java | 4 ++--
2 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index c22df03..8b2ed4c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -175,8 +175,13 @@ public class SafetyNetCloseableRegistry extends
@Override
public void close() throws IOException {
- closeableRegistry.removeCloseableInternal(innerCloseable);
- innerCloseable.close();
+ // Mark sure the inner closeable is still registered and thus unclosed to
+ // prevent duplicated and concurrent closing from registry closing. This could
+ // happen if registry is closing after this phantom reference was enqueued.
+ if (closeableRegistry.removeCloseableInternal(innerCloseable)) {
+ LOG.warn("Closing unclosed resource via safety-net: {}", getDebugString());
+ innerCloseable.close();
+ }
}
}
@@ -205,7 +210,6 @@ public class SafetyNetCloseableRegistry extends
if (toClose != null) {
try {
- LOG.warn("Closing unclosed resource via safety-net: {}", toClose.getDebugString());
toClose.close();
}
catch (Throwable t) {
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index ff4febd..53518ec 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -163,9 +163,9 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
/**
* Removes a mapping from the registry map, respecting locking.
*/
- protected final void removeCloseableInternal(Closeable closeable) {
+ protected final boolean removeCloseableInternal(Closeable closeable) {
synchronized (getSynchronizationLock()) {
- closeableToRef.remove(closeable);
+ return closeableToRef.remove(closeable) != null;
}
}