You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/30 16:51:12 UTC

flink git commit: [FLINK-5681] [runtime] Make ReaperThread for SafetyNetCloseableRegistry a singleton

Repository: flink
Updated Branches:
  refs/heads/master dcfa3fbb0 -> ec3eb593a


[FLINK-5681] [runtime] Make ReaperThread for SafetyNetCloseableRegistry a singleton

This closes #3230


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec3eb593
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec3eb593
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec3eb593

Branch: refs/heads/master
Commit: ec3eb593ae93123cf54cd34c452618d8bd0a7876
Parents: dcfa3fb
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Jan 27 19:51:21 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 30 16:45:58 2017 +0100

----------------------------------------------------------------------
 .../core/fs/SafetyNetCloseableRegistry.java     | 122 ++++++++++++-------
 .../core/fs/SafetyNetCloseableRegistryTest.java |  66 +++++++---
 2 files changed, 128 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec3eb593/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index de4fb30..8b28fa2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -18,20 +18,21 @@
 
 package org.apache.flink.core.fs;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.AbstractCloseableRegistry;
-import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.WrappingProxyUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.ref.PhantomReference;
 import java.lang.ref.ReferenceQueue;
 import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -45,19 +46,35 @@ import java.util.Map;
  * <p>
  * All methods in this class are thread-safe.
  */
+@Internal
 public class SafetyNetCloseableRegistry extends
 		AbstractCloseableRegistry<WrappingProxyCloseable<? extends Closeable>,
 				SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
-	private final ReferenceQueue<WrappingProxyCloseable<? extends Closeable>> referenceQueue;
-	private final Thread reaperThread;
+
+	/** Lock for accessing reaper thread and registry count */
+	private static final Object REAPER_THREAD_LOCK = new Object();
+
+	/** Singleton reaper thread takes care of all registries in VM */
+	@GuardedBy("REAPER_THREAD_LOCK")
+	private static CloseableReaperThread REAPER_THREAD = null;
+
+	/** Global count of all instances of SafetyNetCloseableRegistry */
+	@GuardedBy("REAPER_THREAD_LOCK")
+	private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;
 
 	public SafetyNetCloseableRegistry() {
 		super(new IdentityHashMap<Closeable, PhantomDelegatingCloseableRef>());
-		this.referenceQueue = new ReferenceQueue<>();
-		this.reaperThread = new CloseableReaperThread();
-		reaperThread.start();
+
+		synchronized (REAPER_THREAD_LOCK) {
+			if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
+				Preconditions.checkState(null == REAPER_THREAD);
+				REAPER_THREAD = new CloseableReaperThread();
+				REAPER_THREAD.start();
+			}
+			++GLOBAL_SAFETY_NET_REGISTRY_COUNT;
+		}
 	}
 
 	@Override
@@ -65,14 +82,18 @@ public class SafetyNetCloseableRegistry extends
 			WrappingProxyCloseable<? extends Closeable> wrappingProxyCloseable,
 			Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) throws IOException {
 
+		assert Thread.holdsLock(getSynchronizationLock());
+
 		Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate());
 
 		if (null == innerCloseable) {
 			return;
 		}
 
-		PhantomDelegatingCloseableRef phantomRef =
-				new PhantomDelegatingCloseableRef(wrappingProxyCloseable, referenceQueue);
+		PhantomDelegatingCloseableRef phantomRef = new PhantomDelegatingCloseableRef(
+				wrappingProxyCloseable,
+				this,
+				REAPER_THREAD.referenceQueue);
 
 		closeableMap.put(innerCloseable, phantomRef);
 	}
@@ -82,6 +103,8 @@ public class SafetyNetCloseableRegistry extends
 			WrappingProxyCloseable<? extends Closeable> closeable,
 			Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) {
 
+		assert Thread.holdsLock(getSynchronizationLock());
+
 		Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
 
 		if (null == innerCloseable) {
@@ -91,6 +114,29 @@ public class SafetyNetCloseableRegistry extends
 		closeableMap.remove(innerCloseable);
 	}
 
+	@Override
+	public void close() throws IOException {
+		try {
+			super.close();
+		}
+		finally {
+			synchronized (REAPER_THREAD_LOCK) {
+				--GLOBAL_SAFETY_NET_REGISTRY_COUNT;
+				if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
+					REAPER_THREAD.interrupt();
+					REAPER_THREAD = null;
+				}
+			}
+		}
+	}
+
+	@VisibleForTesting
+	public static boolean isReaperThreadRunning() {
+		synchronized (REAPER_THREAD_LOCK) {
+			return null != REAPER_THREAD && REAPER_THREAD.isAlive();
+		}
+	}
+
 	/**
 	 * Phantom reference to {@link WrappingProxyCloseable}.
 	 */
@@ -99,27 +145,29 @@ public class SafetyNetCloseableRegistry extends
 			implements Closeable {
 
 		private final Closeable innerCloseable;
+		private final SafetyNetCloseableRegistry closeableRegistry;
 		private final String debugString;
 
 		public PhantomDelegatingCloseableRef(
 				WrappingProxyCloseable<? extends Closeable> referent,
+				SafetyNetCloseableRegistry closeableRegistry,
 				ReferenceQueue<? super WrappingProxyCloseable<? extends Closeable>> q) {
 
 			super(referent, q);
 			this.innerCloseable = Preconditions.checkNotNull(WrappingProxyUtil.stripProxy(referent));
+			this.closeableRegistry = Preconditions.checkNotNull(closeableRegistry);
 			this.debugString = referent.toString();
 		}
 
-		public Closeable getInnerCloseable() {
-			return innerCloseable;
-		}
-
 		public String getDebugString() {
 			return debugString;
 		}
 
 		@Override
 		public void close() throws IOException {
+			synchronized (closeableRegistry.getSynchronizationLock()) {
+				closeableRegistry.closeableToRef.remove(innerCloseable);
+			}
 			innerCloseable.close();
 		}
 	}
@@ -127,39 +175,35 @@ public class SafetyNetCloseableRegistry extends
 	/**
 	 * Reaper runnable collects and closes leaking resources
 	 */
-	final class CloseableReaperThread extends Thread {
+	static final class CloseableReaperThread extends Thread {
 
-		public CloseableReaperThread() {
-			super("CloseableReaperThread");
-			this.running = false;
-		}
+		private final ReferenceQueue<WrappingProxyCloseable<? extends Closeable>> referenceQueue;
 
 		private volatile boolean running;
 
+		private CloseableReaperThread() {
+			super("CloseableReaperThread");
+			this.setDaemon(true);
+
+			this.referenceQueue = new ReferenceQueue<>();
+			this.running = true;
+		}
+
 		@Override
 		public void run() {
-			this.running = true;
 			try {
-				List<PhantomDelegatingCloseableRef> closeableList = new LinkedList<>();
 				while (running) {
-					PhantomDelegatingCloseableRef oldRef = (PhantomDelegatingCloseableRef) referenceQueue.remove();
-					synchronized (getSynchronizationLock()) {
-						do {
-							closeableList.add(oldRef);
-							closeableToRef.remove(oldRef.getInnerCloseable());
+					final PhantomDelegatingCloseableRef toClose = (PhantomDelegatingCloseableRef) referenceQueue.remove();
+					
+					if (toClose != null) {
+						try {
+							LOG.warn("Closing unclosed resource via safety-net: {}", toClose.getDebugString());
+							toClose.close();
 						}
-						while ((oldRef = (PhantomDelegatingCloseableRef) referenceQueue.poll()) != null);
-					}
-
-					// close outside the synchronized block in case this is blocking
-					for (PhantomDelegatingCloseableRef closeableRef : closeableList) {
-						IOUtils.closeQuietly(closeableRef);
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Closing unclosed resource: " + closeableRef.getDebugString());
+						catch (Throwable t) {
+							LOG.debug("Error while closing resource via safety-net", t);
 						}
 					}
-
-					closeableList.clear();
 				}
 			} catch (InterruptedException e) {
 				// done
@@ -172,10 +216,4 @@ public class SafetyNetCloseableRegistry extends
 			super.interrupt();
 		}
 	}
-
-	@Override
-	public void close() throws IOException {
-		super.close();
-		reaperThread.interrupt();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec3eb593/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 40856b4..6870780 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -18,21 +18,29 @@
 
 package org.apache.flink.core.fs;
 
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.util.ExceptionUtils;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class SafetyNetCloseableRegistryTest {
 
+	@Rule
+	public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
 	private ProducerThread[] streamOpenThreads;
 	private SafetyNetCloseableRegistry closeableRegistry;
 	private AtomicInteger unclosedCounter;
 
 	public void setup() {
+		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
 		this.closeableRegistry = new SafetyNetCloseableRegistry();
 		this.unclosedCounter = new AtomicInteger(0);
 		this.streamOpenThreads = new ProducerThread[10];
@@ -41,6 +49,11 @@ public class SafetyNetCloseableRegistryTest {
 		}
 	}
 
+	@After
+	public void tearDown() {
+		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
+	}
+
 	private void startThreads(int maxStreams) {
 		for (ProducerThread t : streamOpenThreads) {
 			t.setMaxStreams(maxStreams);
@@ -56,9 +69,10 @@ public class SafetyNetCloseableRegistryTest {
 
 	@Test
 	public void testCorrectScopesForSafetyNet() throws Exception {
-		Thread t1 = new Thread() {
+		CheckedThread t1 = new CheckedThread() {
+
 			@Override
-			public void run() {
+			public void go() throws Exception {
 				try {
 					FileSystem fs1 = FileSystem.getLocalFileSystem();
 					// ensure no safety net in place
@@ -67,11 +81,13 @@ public class SafetyNetCloseableRegistryTest {
 					fs1 = FileSystem.getLocalFileSystem();
 					// ensure safety net is in place now
 					Assert.assertTrue(fs1 instanceof SafetyNetWrapperFileSystem);
-					Path tmp = new Path(fs1.getWorkingDirectory(), UUID.randomUUID().toString());
+
+					Path tmp = new Path(tmpFolder.newFolder().toURI().toString(), "test_file");
+
 					try (FSDataOutputStream stream = fs1.create(tmp, false)) {
-						Thread t2 = new Thread() {
+						CheckedThread t2 = new CheckedThread() {
 							@Override
-							public void run() {
+							public void go() {
 								FileSystem fs2 = FileSystem.getLocalFileSystem();
 								// ensure the safety net does not leak here
 								Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
@@ -85,12 +101,9 @@ public class SafetyNetCloseableRegistryTest {
 								Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
 							}
 						};
+
 						t2.start();
-						try {
-							t2.join();
-						} catch (InterruptedException e) {
-							Assert.fail();
-						}
+						t2.sync();
 
 						//ensure stream is still open and was never closed by any interferences
 						stream.write(42);
@@ -110,13 +123,13 @@ public class SafetyNetCloseableRegistryTest {
 						fs1.delete(tmp, false);
 					}
 				} catch (Exception e) {
-					e.printStackTrace();
-					Assert.fail();
+					Assert.fail(ExceptionUtils.stringifyException(e));
 				}
 			}
 		};
+
 		t1.start();
-		t1.join();
+		t1.sync();
 	}
 
 	@Test
@@ -177,6 +190,23 @@ public class SafetyNetCloseableRegistryTest {
 		closeableRegistry.close();
 	}
 
+	@Test
+	public void testReaperThreadSpawnAndStop() throws Exception {
+		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
+
+		try (SafetyNetCloseableRegistry r1 = new SafetyNetCloseableRegistry()) {
+			Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
+
+			try (SafetyNetCloseableRegistry r2 = new SafetyNetCloseableRegistry()) {
+				Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
+			}
+			Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
+		}
+		Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
+	}
+
+	//------------------------------------------------------------------------------------------------------------------
+
 	private static final class ProducerThread extends Thread {
 
 		private final SafetyNetCloseableRegistry registry;
@@ -205,13 +235,13 @@ public class SafetyNetCloseableRegistryTest {
 					String debug = Thread.currentThread().getName() + " " + count;
 					TestStream testStream = new TestStream(refCount);
 					refCount.incrementAndGet();
+
+					@SuppressWarnings("unused")
 					ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe(testStream, registry, debug); //reference dies here
 
 					try {
 						Thread.sleep(2);
-					} catch (InterruptedException e) {
-
-					}
+					} catch (InterruptedException ignored) {}
 
 					if (maxStreams != Integer.MAX_VALUE) {
 						--maxStreams;
@@ -219,7 +249,7 @@ public class SafetyNetCloseableRegistryTest {
 					++count;
 				}
 			} catch (Exception ex) {
-
+				// ignored
 			}
 		}
 	}