You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/13 09:01:39 UTC

[flink] branch release-1.10 updated (3e17833 -> 69c5553)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3e17833  [FLINK-18646] Verify memory manager empty in a separate thread with larger timeout
     new dd81861  [FLINK-18815] Change Thread.sleep(2) to Thread.sleep(0) to fail SafetyNetCloseableRegistryTest#testClose more often
     new 69c5553  [FLINK-18815] Close safety net guarded closeable iff it is still registered

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/core/fs/SafetyNetCloseableRegistry.java   | 10 +++++++---
 .../java/org/apache/flink/util/AbstractCloseableRegistry.java  |  4 ++--
 .../apache/flink/core/fs/AbstractCloseableRegistryTest.java    |  2 +-
 3 files changed, 10 insertions(+), 6 deletions(-)


[flink] 02/02: [FLINK-18815] Close safety net guarded closeable iff it is still registered

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69c5553206bf722175e919f0d35a9b16373729bf
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Wed Aug 12 11:35:05 2020 +0800

    [FLINK-18815] Close safety net guarded closeable iff it is still registered
    
    This closes #13124.
---
 .../org/apache/flink/core/fs/SafetyNetCloseableRegistry.java   | 10 +++++++---
 .../java/org/apache/flink/util/AbstractCloseableRegistry.java  |  4 ++--
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index 870dcbf..143b1d6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -164,8 +164,13 @@ public class SafetyNetCloseableRegistry extends
 
 		@Override
 		public void close() throws IOException {
-			closeableRegistry.removeCloseableInternal(innerCloseable);
-			innerCloseable.close();
+			// Mark sure the inner closeable is still registered and thus unclosed to
+			// prevent duplicated and concurrent closing from registry closing. This could
+			// happen if registry is closing after this phantom reference was enqueued.
+			if (closeableRegistry.removeCloseableInternal(innerCloseable)) {
+				LOG.warn("Closing unclosed resource via safety-net: {}", getDebugString());
+				innerCloseable.close();
+			}
 		}
 	}
 
@@ -194,7 +199,6 @@ public class SafetyNetCloseableRegistry extends
 
 					if (toClose != null) {
 						try {
-							LOG.warn("Closing unclosed resource via safety-net: {}", toClose.getDebugString());
 							toClose.close();
 						}
 						catch (Throwable t) {
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index ff4febd..53518ec 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -163,9 +163,9 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 	/**
 	 * Removes a mapping from the registry map, respecting locking.
 	 */
-	protected final void removeCloseableInternal(Closeable closeable) {
+	protected final boolean removeCloseableInternal(Closeable closeable) {
 		synchronized (getSynchronizationLock()) {
-			closeableToRef.remove(closeable);
+			return closeableToRef.remove(closeable) != null;
 		}
 	}
 


[flink] 01/02: [FLINK-18815] Change Thread.sleep(2) to Thread.sleep(0) to fail SafetyNetCloseableRegistryTest#testClose more often

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd818610fd6a2bfa26597c8dbd1ad0753b365772
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Wed Aug 12 11:26:16 2020 +0800

    [FLINK-18815] Change Thread.sleep(2) to Thread.sleep(0) to fail SafetyNetCloseableRegistryTest#testClose more often
---
 .../java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index ab14f77..fb1cc82 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -164,7 +164,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
 					createAndRegisterStream();
 
 					try {
-						Thread.sleep(2);
+						Thread.sleep(0);
 					} catch (InterruptedException ignored) {}
 
 					if (maxStreams != Integer.MAX_VALUE) {