You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/29 14:32:22 UTC

flink git commit: [FLINK-5663] [runtime] Prevent leaking SafetyNetCloseableRegistry through InheritableThreadLocal

Repository: flink
Updated Branches:
  refs/heads/release-1.2 3400a87dc -> 617ff50c6


[FLINK-5663] [runtime] Prevent leaking SafetyNetCloseableRegistry through InheritableThreadLocal

This closes #3229


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/617ff50c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/617ff50c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/617ff50c

Branch: refs/heads/release-1.2
Commit: 617ff50c6103aa5d5354b6339531dc38d62127ec
Parents: 3400a87
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Jan 27 19:47:12 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 29 14:58:49 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    | 21 +++---
 .../core/fs/SafetyNetCloseableRegistryTest.java | 71 +++++++++++++++++++-
 .../apache/flink/runtime/taskmanager/Task.java  | 12 +++-
 3 files changed, 88 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/617ff50c/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 33addbb..d8efcbc 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -32,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OperatingSystem;
-
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,7 +78,7 @@ public abstract class FileSystem {
 
 	// ------------------------------------------------------------------------
 
-	private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
+	private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
 
 	private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
 
@@ -99,14 +99,15 @@ public abstract class FileSystem {
 	 * main thread.
 	 */
 	@Internal
-	public static void createFileSystemCloseableRegistryForTask() {
+	public static void createAndSetFileSystemCloseableRegistryForThread() {
 		SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
-		if (null != oldRegistry) {
-			IOUtils.closeQuietly(oldRegistry);
-			LOG.warn("Found existing SafetyNetCloseableRegistry. Closed and replaced it.");
-		}
+		Preconditions.checkState(null == oldRegistry,
+				"Found old CloseableRegistry " + oldRegistry +
+						". This indicates a leak of the InheritableThreadLocal through a ThreadPool!");
+
 		SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
 		REGISTRIES.set(newRegistry);
+		LOG.info("Created new CloseableRegistry " + newRegistry + " for {}", Thread.currentThread().getName());
 	}
 
 	/**
@@ -114,7 +115,7 @@ public abstract class FileSystem {
 	 * main thread or when the task should be canceled.
 	 */
 	@Internal
-	public static void disposeFileSystemCloseableRegistryForTask() {
+	public static void closeAndDisposeFileSystemCloseableRegistryForThread() {
 		SafetyNetCloseableRegistry registry = REGISTRIES.get();
 		if (null != registry) {
 			LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
@@ -123,7 +124,7 @@ public abstract class FileSystem {
 		}
 	}
 
-	private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) {
+	private static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) {
 		SafetyNetCloseableRegistry reg = REGISTRIES.get();
 		return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
 	}
@@ -306,7 +307,7 @@ public abstract class FileSystem {
 	 *         thrown if a reference to the file system instance could not be obtained
 	 */
 	public static FileSystem get(URI uri) throws IOException {
-		return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri));
+		return wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/617ff50c/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 6628407..40856b4 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.core.fs;
 
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class SafetyNetCloseableRegistryTest {
@@ -32,7 +32,6 @@ public class SafetyNetCloseableRegistryTest {
 	private SafetyNetCloseableRegistry closeableRegistry;
 	private AtomicInteger unclosedCounter;
 
-	@Before
 	public void setup() {
 		this.closeableRegistry = new SafetyNetCloseableRegistry();
 		this.unclosedCounter = new AtomicInteger(0);
@@ -56,8 +55,74 @@ public class SafetyNetCloseableRegistryTest {
 	}
 
 	@Test
+	public void testCorrectScopesForSafetyNet() throws Exception {
+		Thread t1 = new Thread() {
+			@Override
+			public void run() {
+				try {
+					FileSystem fs1 = FileSystem.getLocalFileSystem();
+					// ensure no safety net in place
+					Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem);
+					FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+					fs1 = FileSystem.getLocalFileSystem();
+					// ensure safety net is in place now
+					Assert.assertTrue(fs1 instanceof SafetyNetWrapperFileSystem);
+					Path tmp = new Path(fs1.getWorkingDirectory(), UUID.randomUUID().toString());
+					try (FSDataOutputStream stream = fs1.create(tmp, false)) {
+						Thread t2 = new Thread() {
+							@Override
+							public void run() {
+								FileSystem fs2 = FileSystem.getLocalFileSystem();
+								// ensure the safety net does not leak here
+								Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
+								FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+								fs2 = FileSystem.getLocalFileSystem();
+								// ensure we can bring another safety net in place
+								Assert.assertTrue(fs2 instanceof SafetyNetWrapperFileSystem);
+								FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+								fs2 = FileSystem.getLocalFileSystem();
+								// and that we can remove it again
+								Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
+							}
+						};
+						t2.start();
+						try {
+							t2.join();
+						} catch (InterruptedException e) {
+							Assert.fail();
+						}
+
+						//ensure stream is still open and was never closed by any interferences
+						stream.write(42);
+						FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+
+						// ensure leaking stream was closed
+						try {
+							stream.write(43);
+							Assert.fail();
+						} catch (IOException ignore) {
+
+						}
+						fs1 = FileSystem.getLocalFileSystem();
+						// ensure safety net was removed
+						Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem);
+					} finally {
+						fs1.delete(tmp, false);
+					}
+				} catch (Exception e) {
+					e.printStackTrace();
+					Assert.fail();
+				}
+			}
+		};
+		t1.start();
+		t1.join();
+	}
+
+	@Test
 	public void testClose() throws Exception {
 
+		setup();
 		startThreads(Integer.MAX_VALUE);
 
 		for (int i = 0; i < 5; ++i) {
@@ -98,7 +163,7 @@ public class SafetyNetCloseableRegistryTest {
 
 	@Test
 	public void testSafetyNetClose() throws Exception {
-
+		setup();
 		startThreads(20);
 
 		joinThreads();

http://git-wip-us.apache.org/repos/asf/flink/blob/617ff50c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3c57e3f7..ff81827 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -538,8 +538,8 @@ public class Task implements Runnable, TaskActions {
 			//  check for canceling as a shortcut
 			// ----------------------------
 
-			// init closeable registry for this task
-			FileSystem.createFileSystemCloseableRegistryForTask();
+			// activate safety net for task thread
+			FileSystem.createAndSetFileSystemCloseableRegistryForThread();
 
 			// first of all, get a user-code classloader
 			// this may involve downloading the job's JAR files and/or classes
@@ -763,7 +763,8 @@ public class Task implements Runnable, TaskActions {
 
 				// remove all files in the distributed cache
 				removeCachedFiles(distributedCacheEntries, fileCache);
-				FileSystem.disposeFileSystemCloseableRegistryForTask();
+				// close and de-activate safety net for task thread
+				FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
 
 				notifyFinalState();
 			}
@@ -1104,6 +1105,8 @@ public class Task implements Runnable, TaskActions {
 				Runnable runnable = new Runnable() {
 					@Override
 					public void run() {
+						// activate safety net for checkpointing thread
+						FileSystem.createAndSetFileSystemCloseableRegistryForThread();
 						try {
 							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
 							if (!success) {
@@ -1122,6 +1125,9 @@ public class Task implements Runnable, TaskActions {
 									"{} ({}) while being not in state running.", checkpointID,
 									taskNameWithSubtask, executionId, t);
 							}
+						} finally {
+							// close and de-activate safety net for checkpointing thread
+							FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
 						}
 					}
 				};