You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/09 12:46:12 UTC

[flink] 01/05: [hotfix][test] Guarantee order of CloseableRegistry

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a68dc3c440c79c031d77d0082b7f8e4707311382
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jul 5 16:50:57 2019 +0200

    [hotfix][test] Guarantee order of CloseableRegistry
---
 .../java/org/apache/flink/core/fs/CloseableRegistry.java | 16 ++++++++++++++--
 .../org/apache/flink/util/AbstractCloseableRegistry.java |  8 ++++++--
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 5f1c9fb..07e8b5b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -24,7 +24,10 @@ import org.apache.flink.util.AbstractCloseableRegistry;
 import javax.annotation.Nonnull;
 
 import java.io.Closeable;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
@@ -33,6 +36,8 @@ import java.util.Map;
  * <p>Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
  *
  * <p>All methods in this class are thread-safe.
+ *
+ * <p>This class closes all registered {@link Closeable}s in the reverse registration order.
  */
 @Internal
 public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> {
@@ -40,7 +45,7 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje
 	private static final Object DUMMY = new Object();
 
 	public CloseableRegistry() {
-		super(new HashMap<>());
+		super(new LinkedHashMap<>());
 	}
 
 	@Override
@@ -52,4 +57,11 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje
 	protected boolean doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object> closeableMap) {
 		return closeableMap.remove(closeable) != null;
 	}
+
+	@Override
+	protected Collection<Closeable> getReferencesToClose() {
+		ArrayList<Closeable> closeablesToClose = new ArrayList<>(closeableToRef.keySet());
+		Collections.reverse(closeablesToClose);
+		return closeablesToClose;
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index e6589f6..ff4febd 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -49,7 +49,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 
 	/** Map from tracked Closeables to some associated meta data. */
 	@GuardedBy("lock")
-	private final Map<Closeable, T> closeableToRef;
+	protected final Map<Closeable, T> closeableToRef;
 
 	/** Indicates if this registry is closed. */
 	@GuardedBy("lock")
@@ -114,7 +114,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 
 			closed = true;
 
-			toCloseCopy = new ArrayList<>(closeableToRef.keySet());
+			toCloseCopy = getReferencesToClose();
 
 			closeableToRef.clear();
 		}
@@ -128,6 +128,10 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 		}
 	}
 
+	protected Collection<Closeable> getReferencesToClose() {
+		return new ArrayList<>(closeableToRef.keySet());
+	}
+
 	/**
 	 * Does the actual registration of the closeable with the registry map. This should not do any long running or
 	 * potentially blocking operations as is is executed under the registry's lock.