You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:33 UTC
[19/19] flink git commit: [FLINK-5812] [core] Cleanups in FileSystem
(round 2)
[FLINK-5812] [core] Cleanups in FileSystem (round 2)
Move the FileSystem safety net to a separate class.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5902ea0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5902ea0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5902ea0e
Branch: refs/heads/master
Commit: 5902ea0e88c70f330c23b9ace94033ae34c84445
Parents: a1bfae9
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 15 17:58:37 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:24 2017 +0100
----------------------------------------------------------------------
flink-core/pom.xml | 2 +-
.../org/apache/flink/core/fs/FileSystem.java | 52 +-------
.../flink/core/fs/FileSystemSafetyNet.java | 124 +++++++++++++++++++
.../flink/util/AbstractCloseableRegistry.java | 4 -
.../core/fs/SafetyNetCloseableRegistryTest.java | 8 +-
.../apache/flink/runtime/taskmanager/Task.java | 11 +-
6 files changed, 140 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index e9738a2..0a0d06e 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -154,7 +154,7 @@ under the License.
<parameter>
<excludes combine.children="append">
<exclude>org.apache.flink.api.common.ExecutionConfig#CONFIG_KEY</exclude>
- <exclude>org.apache.flink.core.fs.FileSystem$FSKey</exclude>
+ <exclude>org.apache.flink.core.fs.FileSystem\$FSKey</exclude>
<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
<!-- Breaking changes between 1.1 and 1.2.
We ignore these changes because these are low-level, internal runtime configuration parameters -->
http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 4149d5e..fab0f4d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -17,7 +17,7 @@
*/
-/**
+/*
* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
@@ -30,12 +30,7 @@ import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.util.IOUtils;
import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
@@ -174,6 +169,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* application task finishes (or is canceled or failed). That way, the task's threads do not
* leak connections.
*
+ * <p>Internal runtime code can explicitly obtain a FileSystem that does not use the safety
+ * net via {@link FileSystem#getUnguardedFileSystem(URI)}.
+ *
* @see FSDataInputStream
* @see FSDataOutputStream
*/
@@ -198,57 +196,18 @@ public abstract class FileSystem {
// ------------------------------------------------------------------------
- private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
-
private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";
private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
- private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
-
/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
* {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */
private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
// ------------------------------------------------------------------------
- /**
- * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning of the task's
- * main thread.
- */
- @Internal
- public static void createAndSetFileSystemCloseableRegistryForThread() {
- SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
- Preconditions.checkState(null == oldRegistry,
- "Found old CloseableRegistry " + oldRegistry +
- ". This indicates a leak of the InheritableThreadLocal through a ThreadPool!");
-
- SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
- REGISTRIES.set(newRegistry);
- LOG.info("Created new CloseableRegistry " + newRegistry + " for {}", Thread.currentThread().getName());
- }
-
- /**
- * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the end of the task's
- * main thread or when the task should be canceled.
- */
- @Internal
- public static void closeAndDisposeFileSystemCloseableRegistryForThread() {
- SafetyNetCloseableRegistry registry = REGISTRIES.get();
- if (null != registry) {
- LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
- REGISTRIES.remove();
- IOUtils.closeQuietly(registry);
- }
- }
-
- private static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) {
- SafetyNetCloseableRegistry reg = REGISTRIES.get();
- return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
- }
-
/** Object used to protect calls to specific methods.*/
private static final Object SYNCHRONIZATION_OBJECT = new Object();
@@ -427,7 +386,7 @@ public abstract class FileSystem {
* thrown if a reference to the file system instance could not be obtained
*/
public static FileSystem get(URI uri) throws IOException {
- return wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
+ return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
}
/**
@@ -971,7 +930,6 @@ public abstract class FileSystem {
/**
* An identifier of a file system, via its scheme and its authority.
- * This class needs to stay public, because it is detected as part of the public API.
*/
private static final class FSKey {
http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
new file mode 100644
index 0000000..b18cb13
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The FileSystemSafetyNet can be used to guard a thread against {@link FileSystem} stream resource leaks.
+ * When activated for a thread, it tracks all streams that are opened by FileSystems that the thread
+ * obtains. The safety net has a global cleanup hook that will close all streams that were
+ * not properly closed.
+ *
+ * <p>The main thread of each Flink task, as well as the checkpointing thread are automatically guarded
+ * by this safety net.
+ *
+ * <p><b>Important:</b> This safety net works only for streams created by Flink's FileSystem abstraction,
+ * i.e., for {@code FileSystem} instances obtained via {@link FileSystem#get(URI)} or through
+ * {@link Path#getFileSystem()}.
+ *
+ * <p><b>Important:</b> When a guarded thread obtains a {@code FileSystem} or a stream and passes them
+ * to another thread, the safety net will close those resources once the former thread finishes.
+ *
+ * <p>The safety net can be used as follows:
+ * <pre>{@code
+ *
+ * class GuardedThread extends Thread {
+ *
+ * public void run() {
+ * FileSystemSafetyNet.initializeSafetyNetForThread();
+ * try {
+ * // do some heavy stuff where you are unsure whether it closes all streams
+ * // like some untrusted user code or library code
+ * }
+ * finally {
+ * FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+ * }
+ * }
+ * }
+ * }</pre>
+ */
+@Internal
+public class FileSystemSafetyNet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileSystemSafetyNet.class);
+
+ /** The map from thread to the safety net registry for that thread */
+ private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
+
+ // ------------------------------------------------------------------------
+ // Activating / Deactivating
+ // ------------------------------------------------------------------------
+
+ /**
+ * Activates the safety net for a thread. {@link FileSystem} instances obtained by the thread
+ * that called this method will be guarded, meaning that their created streams are tracked and can
+ * be closed via the safety net closing hook.
+ *
+ * <p>This method should be called at the beginning of a thread that should be guarded.
+ *
+ * @throws IllegalStateException Thrown, if a safety net was already registered for the thread.
+ */
+ @Internal
+ public static void initializeSafetyNetForThread() {
+ SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
+
+ checkState(null == oldRegistry, "Found an existing FileSystem safety net for this thread: %s " +
+ "This may indicate an accidental repeated initialization, or a leak of the" +
+ "(Inheritable)ThreadLocal through a ThreadPool.", oldRegistry);
+
+ SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
+ REGISTRIES.set(newRegistry);
+ LOG.info("Created new CloseableRegistry {} for {}", newRegistry, Thread.currentThread().getName());
+ }
+
+ /**
+ * Closes the safety net for a thread. This closes all remaining unclosed streams that were opened
+ * by safety-net-guarded file systems. After this method was called, no streams can be opened any more
+ * from any FileSystem instance that was obtained while the thread was guarded by the safety net.
+ *
+ * <p>This method should be called at the very end of a guarded thread.
+ */
+ @Internal
+ public static void closeSafetyNetAndGuardedResourcesForThread() {
+ SafetyNetCloseableRegistry registry = REGISTRIES.get();
+ if (null != registry) {
+ LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
+ REGISTRIES.remove();
+ IOUtils.closeQuietly(registry);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) {
+ SafetyNetCloseableRegistry reg = REGISTRIES.get();
+ return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index 2b7a8c8..766ede9 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -108,10 +108,6 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
return closeableToRef;
}
- // ------------------------------------------------------------------------
- //
- // ------------------------------------------------------------------------
-
protected abstract void doUnRegister(C closeable, Map<Closeable, T> closeableMap);
protected abstract void doRegister(C closeable, Map<Closeable, T> closeableMap) throws IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 6870780..7973c69 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -77,7 +77,7 @@ public class SafetyNetCloseableRegistryTest {
FileSystem fs1 = FileSystem.getLocalFileSystem();
// ensure no safety net in place
Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem);
- FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+ FileSystemSafetyNet.initializeSafetyNetForThread();
fs1 = FileSystem.getLocalFileSystem();
// ensure safety net is in place now
Assert.assertTrue(fs1 instanceof SafetyNetWrapperFileSystem);
@@ -91,11 +91,11 @@ public class SafetyNetCloseableRegistryTest {
FileSystem fs2 = FileSystem.getLocalFileSystem();
// ensure the safety net does not leak here
Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
- FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+ FileSystemSafetyNet.initializeSafetyNetForThread();
fs2 = FileSystem.getLocalFileSystem();
// ensure we can bring another safety net in place
Assert.assertTrue(fs2 instanceof SafetyNetWrapperFileSystem);
- FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+ FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
fs2 = FileSystem.getLocalFileSystem();
// and that we can remove it again
Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
@@ -107,7 +107,7 @@ public class SafetyNetCloseableRegistryTest {
//ensure stream is still open and was never closed by any interferences
stream.write(42);
- FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+ FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
// ensure leaking stream was closed
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 64a83c9..acb423b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
@@ -552,7 +552,7 @@ public class Task implements Runnable, TaskActions {
// ----------------------------
// activate safety net for task thread
- FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+ FileSystemSafetyNet.initializeSafetyNetForThread();
// first of all, get a user-code classloader
// this may involve downloading the job's JAR files and/or classes
@@ -789,8 +789,9 @@ public class Task implements Runnable, TaskActions {
// remove all files in the distributed cache
removeCachedFiles(distributedCacheEntries, fileCache);
+
// close and de-activate safety net for task thread
- FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+ FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
notifyFinalState();
}
@@ -1131,7 +1132,7 @@ public class Task implements Runnable, TaskActions {
@Override
public void run() {
// activate safety net for checkpointing thread
- FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+ FileSystemSafetyNet.initializeSafetyNetForThread();
try {
boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
if (!success) {
@@ -1152,7 +1153,7 @@ public class Task implements Runnable, TaskActions {
}
} finally {
// close and de-activate safety net for checkpointing thread
- FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+ FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
};