You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/30 16:51:12 UTC
flink git commit: [FLINK-5681] [runtime] Make ReaperThread for
SafetyNetCloseableRegistry a singleton
Repository: flink
Updated Branches:
refs/heads/master dcfa3fbb0 -> ec3eb593a
[FLINK-5681] [runtime] Make ReaperThread for SafetyNetCloseableRegistry a singleton
This closes #3230
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec3eb593
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec3eb593
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec3eb593
Branch: refs/heads/master
Commit: ec3eb593ae93123cf54cd34c452618d8bd0a7876
Parents: dcfa3fb
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Jan 27 19:51:21 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 30 16:45:58 2017 +0100
----------------------------------------------------------------------
.../core/fs/SafetyNetCloseableRegistry.java | 122 ++++++++++++-------
.../core/fs/SafetyNetCloseableRegistryTest.java | 66 +++++++---
2 files changed, 128 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec3eb593/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index de4fb30..8b28fa2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -18,20 +18,21 @@
package org.apache.flink.core.fs;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.AbstractCloseableRegistry;
-import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingProxyUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.GuardedBy;
import java.io.Closeable;
import java.io.IOException;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
/**
@@ -45,19 +46,35 @@ import java.util.Map;
* <p>
* All methods in this class are thread-safe.
*/
+@Internal
public class SafetyNetCloseableRegistry extends
AbstractCloseableRegistry<WrappingProxyCloseable<? extends Closeable>,
SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
- private final ReferenceQueue<WrappingProxyCloseable<? extends Closeable>> referenceQueue;
- private final Thread reaperThread;
+
+ /** Lock for accessing reaper thread and registry count */
+ private static final Object REAPER_THREAD_LOCK = new Object();
+
+ /** Singleton reaper thread takes care of all registries in VM */
+ @GuardedBy("REAPER_THREAD_LOCK")
+ private static CloseableReaperThread REAPER_THREAD = null;
+
+ /** Global count of all instances of SafetyNetCloseableRegistry */
+ @GuardedBy("REAPER_THREAD_LOCK")
+ private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;
public SafetyNetCloseableRegistry() {
super(new IdentityHashMap<Closeable, PhantomDelegatingCloseableRef>());
- this.referenceQueue = new ReferenceQueue<>();
- this.reaperThread = new CloseableReaperThread();
- reaperThread.start();
+
+ synchronized (REAPER_THREAD_LOCK) {
+ if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
+ Preconditions.checkState(null == REAPER_THREAD);
+ REAPER_THREAD = new CloseableReaperThread();
+ REAPER_THREAD.start();
+ }
+ ++GLOBAL_SAFETY_NET_REGISTRY_COUNT;
+ }
}
@Override
@@ -65,14 +82,18 @@ public class SafetyNetCloseableRegistry extends
WrappingProxyCloseable<? extends Closeable> wrappingProxyCloseable,
Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) throws IOException {
+ assert Thread.holdsLock(getSynchronizationLock());
+
Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate());
if (null == innerCloseable) {
return;
}
- PhantomDelegatingCloseableRef phantomRef =
- new PhantomDelegatingCloseableRef(wrappingProxyCloseable, referenceQueue);
+ PhantomDelegatingCloseableRef phantomRef = new PhantomDelegatingCloseableRef(
+ wrappingProxyCloseable,
+ this,
+ REAPER_THREAD.referenceQueue);
closeableMap.put(innerCloseable, phantomRef);
}
@@ -82,6 +103,8 @@ public class SafetyNetCloseableRegistry extends
WrappingProxyCloseable<? extends Closeable> closeable,
Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) {
+ assert Thread.holdsLock(getSynchronizationLock());
+
Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
if (null == innerCloseable) {
@@ -91,6 +114,29 @@ public class SafetyNetCloseableRegistry extends
closeableMap.remove(innerCloseable);
}
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ }
+ finally {
+ synchronized (REAPER_THREAD_LOCK) {
+ --GLOBAL_SAFETY_NET_REGISTRY_COUNT;
+ if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
+ REAPER_THREAD.interrupt();
+ REAPER_THREAD = null;
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public static boolean isReaperThreadRunning() {
+ synchronized (REAPER_THREAD_LOCK) {
+ return null != REAPER_THREAD && REAPER_THREAD.isAlive();
+ }
+ }
+
/**
* Phantom reference to {@link WrappingProxyCloseable}.
*/
@@ -99,27 +145,29 @@ public class SafetyNetCloseableRegistry extends
implements Closeable {
private final Closeable innerCloseable;
+ private final SafetyNetCloseableRegistry closeableRegistry;
private final String debugString;
public PhantomDelegatingCloseableRef(
WrappingProxyCloseable<? extends Closeable> referent,
+ SafetyNetCloseableRegistry closeableRegistry,
ReferenceQueue<? super WrappingProxyCloseable<? extends Closeable>> q) {
super(referent, q);
this.innerCloseable = Preconditions.checkNotNull(WrappingProxyUtil.stripProxy(referent));
+ this.closeableRegistry = Preconditions.checkNotNull(closeableRegistry);
this.debugString = referent.toString();
}
- public Closeable getInnerCloseable() {
- return innerCloseable;
- }
-
public String getDebugString() {
return debugString;
}
@Override
public void close() throws IOException {
+ synchronized (closeableRegistry.getSynchronizationLock()) {
+ closeableRegistry.closeableToRef.remove(innerCloseable);
+ }
innerCloseable.close();
}
}
@@ -127,39 +175,35 @@ public class SafetyNetCloseableRegistry extends
/**
* Reaper runnable collects and closes leaking resources
*/
- final class CloseableReaperThread extends Thread {
+ static final class CloseableReaperThread extends Thread {
- public CloseableReaperThread() {
- super("CloseableReaperThread");
- this.running = false;
- }
+ private final ReferenceQueue<WrappingProxyCloseable<? extends Closeable>> referenceQueue;
private volatile boolean running;
+ private CloseableReaperThread() {
+ super("CloseableReaperThread");
+ this.setDaemon(true);
+
+ this.referenceQueue = new ReferenceQueue<>();
+ this.running = true;
+ }
+
@Override
public void run() {
- this.running = true;
try {
- List<PhantomDelegatingCloseableRef> closeableList = new LinkedList<>();
while (running) {
- PhantomDelegatingCloseableRef oldRef = (PhantomDelegatingCloseableRef) referenceQueue.remove();
- synchronized (getSynchronizationLock()) {
- do {
- closeableList.add(oldRef);
- closeableToRef.remove(oldRef.getInnerCloseable());
+ final PhantomDelegatingCloseableRef toClose = (PhantomDelegatingCloseableRef) referenceQueue.remove();
+
+ if (toClose != null) {
+ try {
+ LOG.warn("Closing unclosed resource via safety-net: {}", toClose.getDebugString());
+ toClose.close();
}
- while ((oldRef = (PhantomDelegatingCloseableRef) referenceQueue.poll()) != null);
- }
-
- // close outside the synchronized block in case this is blocking
- for (PhantomDelegatingCloseableRef closeableRef : closeableList) {
- IOUtils.closeQuietly(closeableRef);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing unclosed resource: " + closeableRef.getDebugString());
+ catch (Throwable t) {
+ LOG.debug("Error while closing resource via safety-net", t);
}
}
-
- closeableList.clear();
}
} catch (InterruptedException e) {
// done
@@ -172,10 +216,4 @@ public class SafetyNetCloseableRegistry extends
super.interrupt();
}
}
-
- @Override
- public void close() throws IOException {
- super.close();
- reaperThread.interrupt();
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec3eb593/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 40856b4..6870780 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -18,21 +18,29 @@
package org.apache.flink.core.fs;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.util.ExceptionUtils;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.Closeable;
import java.io.IOException;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
public class SafetyNetCloseableRegistryTest {
+ @Rule
+ public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
private ProducerThread[] streamOpenThreads;
private SafetyNetCloseableRegistry closeableRegistry;
private AtomicInteger unclosedCounter;
public void setup() {
+ Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
this.closeableRegistry = new SafetyNetCloseableRegistry();
this.unclosedCounter = new AtomicInteger(0);
this.streamOpenThreads = new ProducerThread[10];
@@ -41,6 +49,11 @@ public class SafetyNetCloseableRegistryTest {
}
}
+ @After
+ public void tearDown() {
+ Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
+ }
+
private void startThreads(int maxStreams) {
for (ProducerThread t : streamOpenThreads) {
t.setMaxStreams(maxStreams);
@@ -56,9 +69,10 @@ public class SafetyNetCloseableRegistryTest {
@Test
public void testCorrectScopesForSafetyNet() throws Exception {
- Thread t1 = new Thread() {
+ CheckedThread t1 = new CheckedThread() {
+
@Override
- public void run() {
+ public void go() throws Exception {
try {
FileSystem fs1 = FileSystem.getLocalFileSystem();
// ensure no safety net in place
@@ -67,11 +81,13 @@ public class SafetyNetCloseableRegistryTest {
fs1 = FileSystem.getLocalFileSystem();
// ensure safety net is in place now
Assert.assertTrue(fs1 instanceof SafetyNetWrapperFileSystem);
- Path tmp = new Path(fs1.getWorkingDirectory(), UUID.randomUUID().toString());
+
+ Path tmp = new Path(tmpFolder.newFolder().toURI().toString(), "test_file");
+
try (FSDataOutputStream stream = fs1.create(tmp, false)) {
- Thread t2 = new Thread() {
+ CheckedThread t2 = new CheckedThread() {
@Override
- public void run() {
+ public void go() {
FileSystem fs2 = FileSystem.getLocalFileSystem();
// ensure the safety net does not leak here
Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
@@ -85,12 +101,9 @@ public class SafetyNetCloseableRegistryTest {
Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
}
};
+
t2.start();
- try {
- t2.join();
- } catch (InterruptedException e) {
- Assert.fail();
- }
+ t2.sync();
//ensure stream is still open and was never closed by any interferences
stream.write(42);
@@ -110,13 +123,13 @@ public class SafetyNetCloseableRegistryTest {
fs1.delete(tmp, false);
}
} catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
+ Assert.fail(ExceptionUtils.stringifyException(e));
}
}
};
+
t1.start();
- t1.join();
+ t1.sync();
}
@Test
@@ -177,6 +190,23 @@ public class SafetyNetCloseableRegistryTest {
closeableRegistry.close();
}
+ @Test
+ public void testReaperThreadSpawnAndStop() throws Exception {
+ Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
+
+ try (SafetyNetCloseableRegistry r1 = new SafetyNetCloseableRegistry()) {
+ Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
+
+ try (SafetyNetCloseableRegistry r2 = new SafetyNetCloseableRegistry()) {
+ Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
+ }
+ Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
+ }
+ Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
+ }
+
+ //------------------------------------------------------------------------------------------------------------------
+
private static final class ProducerThread extends Thread {
private final SafetyNetCloseableRegistry registry;
@@ -205,13 +235,13 @@ public class SafetyNetCloseableRegistryTest {
String debug = Thread.currentThread().getName() + " " + count;
TestStream testStream = new TestStream(refCount);
refCount.incrementAndGet();
+
+ @SuppressWarnings("unused")
ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe(testStream, registry, debug); //reference dies here
try {
Thread.sleep(2);
- } catch (InterruptedException e) {
-
- }
+ } catch (InterruptedException ignored) {}
if (maxStreams != Integer.MAX_VALUE) {
--maxStreams;
@@ -219,7 +249,7 @@ public class SafetyNetCloseableRegistryTest {
++count;
}
} catch (Exception ex) {
-
+ // ignored
}
}
}