You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/09 12:46:12 UTC
[flink] 01/05: [hotfix][test] Guarantee order of CloseableRegistry
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a68dc3c440c79c031d77d0082b7f8e4707311382
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jul 5 16:50:57 2019 +0200
[hotfix][test] Guarantee order of CloseableRegistry
---
.../java/org/apache/flink/core/fs/CloseableRegistry.java | 16 ++++++++++++++--
.../org/apache/flink/util/AbstractCloseableRegistry.java | 8 ++++++--
2 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 5f1c9fb..07e8b5b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -24,7 +24,10 @@ import org.apache.flink.util.AbstractCloseableRegistry;
import javax.annotation.Nonnull;
import java.io.Closeable;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.Map;
/**
@@ -33,6 +36,8 @@ import java.util.Map;
* <p>Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
*
* <p>All methods in this class are thread-safe.
+ *
+ * <p>This class closes all registered {@link Closeable}s in the reverse registration order.
*/
@Internal
public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> {
@@ -40,7 +45,7 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje
private static final Object DUMMY = new Object();
public CloseableRegistry() {
- super(new HashMap<>());
+ super(new LinkedHashMap<>());
}
@Override
@@ -52,4 +57,11 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje
protected boolean doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object> closeableMap) {
return closeableMap.remove(closeable) != null;
}
+
+ @Override
+ protected Collection<Closeable> getReferencesToClose() {
+ ArrayList<Closeable> closeablesToClose = new ArrayList<>(closeableToRef.keySet());
+ Collections.reverse(closeablesToClose);
+ return closeablesToClose;
+ }
}
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index e6589f6..ff4febd 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -49,7 +49,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
/** Map from tracked Closeables to some associated meta data. */
@GuardedBy("lock")
- private final Map<Closeable, T> closeableToRef;
+ protected final Map<Closeable, T> closeableToRef;
/** Indicates if this registry is closed. */
@GuardedBy("lock")
@@ -114,7 +114,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
closed = true;
- toCloseCopy = new ArrayList<>(closeableToRef.keySet());
+ toCloseCopy = getReferencesToClose();
closeableToRef.clear();
}
@@ -128,6 +128,10 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
}
}
+ protected Collection<Closeable> getReferencesToClose() {
+ return new ArrayList<>(closeableToRef.keySet());
+ }
+
/**
* Does the actual registration of the closeable with the registry map. This should not do any long running or
* potentially blocking operations as is is executed under the registry's lock.