You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/29 14:32:22 UTC
flink git commit: [FLINK-5663] [runtime] Prevent leaking
SafetyNetCloseableRegistry through InheritableThreadLocal
Repository: flink
Updated Branches:
refs/heads/release-1.2 3400a87dc -> 617ff50c6
[FLINK-5663] [runtime] Prevent leaking SafetyNetCloseableRegistry through InheritableThreadLocal
This closes #3229
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/617ff50c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/617ff50c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/617ff50c
Branch: refs/heads/release-1.2
Commit: 617ff50c6103aa5d5354b6339531dc38d62127ec
Parents: 3400a87
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri Jan 27 19:47:12 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 29 14:58:49 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/core/fs/FileSystem.java | 21 +++---
.../core/fs/SafetyNetCloseableRegistryTest.java | 71 +++++++++++++++++++-
.../apache/flink/runtime/taskmanager/Task.java | 12 +++-
3 files changed, 88 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/617ff50c/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 33addbb..d8efcbc 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -32,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.OperatingSystem;
-
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,7 +78,7 @@ public abstract class FileSystem {
// ------------------------------------------------------------------------
- private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
+ private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
@@ -99,14 +99,15 @@ public abstract class FileSystem {
* main thread.
*/
@Internal
- public static void createFileSystemCloseableRegistryForTask() {
+ public static void createAndSetFileSystemCloseableRegistryForThread() {
SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
- if (null != oldRegistry) {
- IOUtils.closeQuietly(oldRegistry);
- LOG.warn("Found existing SafetyNetCloseableRegistry. Closed and replaced it.");
- }
+ Preconditions.checkState(null == oldRegistry,
+ "Found old CloseableRegistry " + oldRegistry +
+ ". This indicates a leak of the InheritableThreadLocal through a ThreadPool!");
+
SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
REGISTRIES.set(newRegistry);
+ LOG.info("Created new CloseableRegistry " + newRegistry + " for {}", Thread.currentThread().getName());
}
/**
@@ -114,7 +115,7 @@ public abstract class FileSystem {
* main thread or when the task should be canceled.
*/
@Internal
- public static void disposeFileSystemCloseableRegistryForTask() {
+ public static void closeAndDisposeFileSystemCloseableRegistryForThread() {
SafetyNetCloseableRegistry registry = REGISTRIES.get();
if (null != registry) {
LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
@@ -123,7 +124,7 @@ public abstract class FileSystem {
}
}
- private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) {
+ private static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) {
SafetyNetCloseableRegistry reg = REGISTRIES.get();
return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
}
@@ -306,7 +307,7 @@ public abstract class FileSystem {
* thrown if a reference to the file system instance could not be obtained
*/
public static FileSystem get(URI uri) throws IOException {
- return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri));
+ return wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/617ff50c/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 6628407..40856b4 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -19,11 +19,11 @@
package org.apache.flink.core.fs;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.io.Closeable;
import java.io.IOException;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
public class SafetyNetCloseableRegistryTest {
@@ -32,7 +32,6 @@ public class SafetyNetCloseableRegistryTest {
private SafetyNetCloseableRegistry closeableRegistry;
private AtomicInteger unclosedCounter;
- @Before
public void setup() {
this.closeableRegistry = new SafetyNetCloseableRegistry();
this.unclosedCounter = new AtomicInteger(0);
@@ -56,8 +55,74 @@ public class SafetyNetCloseableRegistryTest {
}
@Test
+ public void testCorrectScopesForSafetyNet() throws Exception {
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ FileSystem fs1 = FileSystem.getLocalFileSystem();
+ // ensure no safety net in place
+ Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem);
+ FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+ fs1 = FileSystem.getLocalFileSystem();
+ // ensure safety net is in place now
+ Assert.assertTrue(fs1 instanceof SafetyNetWrapperFileSystem);
+ Path tmp = new Path(fs1.getWorkingDirectory(), UUID.randomUUID().toString());
+ try (FSDataOutputStream stream = fs1.create(tmp, false)) {
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ FileSystem fs2 = FileSystem.getLocalFileSystem();
+ // ensure the safety net does not leak here
+ Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
+ FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+ fs2 = FileSystem.getLocalFileSystem();
+ // ensure we can bring another safety net in place
+ Assert.assertTrue(fs2 instanceof SafetyNetWrapperFileSystem);
+ FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+ fs2 = FileSystem.getLocalFileSystem();
+ // and that we can remove it again
+ Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
+ }
+ };
+ t2.start();
+ try {
+ t2.join();
+ } catch (InterruptedException e) {
+ Assert.fail();
+ }
+
+ //ensure stream is still open and was never closed by any interferences
+ stream.write(42);
+ FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+
+ // ensure leaking stream was closed
+ try {
+ stream.write(43);
+ Assert.fail();
+ } catch (IOException ignore) {
+
+ }
+ fs1 = FileSystem.getLocalFileSystem();
+ // ensure safety net was removed
+ Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem);
+ } finally {
+ fs1.delete(tmp, false);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ };
+ t1.start();
+ t1.join();
+ }
+
+ @Test
public void testClose() throws Exception {
+ setup();
startThreads(Integer.MAX_VALUE);
for (int i = 0; i < 5; ++i) {
@@ -98,7 +163,7 @@ public class SafetyNetCloseableRegistryTest {
@Test
public void testSafetyNetClose() throws Exception {
-
+ setup();
startThreads(20);
joinThreads();
http://git-wip-us.apache.org/repos/asf/flink/blob/617ff50c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3c57e3f7..ff81827 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -538,8 +538,8 @@ public class Task implements Runnable, TaskActions {
// check for canceling as a shortcut
// ----------------------------
- // init closeable registry for this task
- FileSystem.createFileSystemCloseableRegistryForTask();
+ // activate safety net for task thread
+ FileSystem.createAndSetFileSystemCloseableRegistryForThread();
// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
@@ -763,7 +763,8 @@ public class Task implements Runnable, TaskActions {
// remove all files in the distributed cache
removeCachedFiles(distributedCacheEntries, fileCache);
- FileSystem.disposeFileSystemCloseableRegistryForTask();
+ // close and de-activate safety net for task thread
+ FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
notifyFinalState();
}
@@ -1104,6 +1105,8 @@ public class Task implements Runnable, TaskActions {
Runnable runnable = new Runnable() {
@Override
public void run() {
+ // activate safety net for checkpointing thread
+ FileSystem.createAndSetFileSystemCloseableRegistryForThread();
try {
boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
if (!success) {
@@ -1122,6 +1125,9 @@ public class Task implements Runnable, TaskActions {
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
+ } finally {
+ // close and de-activate safety net for checkpointing thread
+ FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
}
}
};