You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:33 UTC

[19/19] flink git commit: [FLINK-5812] [core] Cleanups in FileSystem (round 2)

[FLINK-5812] [core] Cleanups in FileSystem (round 2)

Move the FileSystem safety net to a separate class.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5902ea0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5902ea0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5902ea0e

Branch: refs/heads/master
Commit: 5902ea0e88c70f330c23b9ace94033ae34c84445
Parents: a1bfae9
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 15 17:58:37 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:24 2017 +0100

----------------------------------------------------------------------
 flink-core/pom.xml                              |   2 +-
 .../org/apache/flink/core/fs/FileSystem.java    |  52 +-------
 .../flink/core/fs/FileSystemSafetyNet.java      | 124 +++++++++++++++++++
 .../flink/util/AbstractCloseableRegistry.java   |   4 -
 .../core/fs/SafetyNetCloseableRegistryTest.java |   8 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  11 +-
 6 files changed, 140 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index e9738a2..0a0d06e 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -154,7 +154,7 @@ under the License.
 					<parameter>
 						<excludes combine.children="append">
 							<exclude>org.apache.flink.api.common.ExecutionConfig#CONFIG_KEY</exclude>
-							<exclude>org.apache.flink.core.fs.FileSystem$FSKey</exclude>
+							<exclude>org.apache.flink.core.fs.FileSystem\$FSKey</exclude>
 							<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
 							<!-- Breaking changes between 1.1 and 1.2. 
 							We ignore these changes because these are low-level, internal runtime configuration parameters -->

http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 4149d5e..fab0f4d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -17,7 +17,7 @@
  */
 
 
-/**
+/*
  * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
  * additional information regarding copyright ownership.
@@ -30,12 +30,7 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -174,6 +169,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * application task finishes (or is canceled or failed). That way, the task's threads do not
  * leak connections.
  * 
+ * <p>Internal runtime code can explicitly obtain a FileSystem that does not use the safety
+ * net via {@link FileSystem#getUnguardedFileSystem(URI)}.
+ * 
  * @see FSDataInputStream
  * @see FSDataOutputStream
  */
@@ -198,57 +196,18 @@ public abstract class FileSystem {
 
 	// ------------------------------------------------------------------------
 
-	private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
-
 	private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
 
 	private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";
 
 	private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
 
-	private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
-
 	/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
 	 * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */
 	private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
 
 	// ------------------------------------------------------------------------
 
-	/**
-	 * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning of the task's
-	 * main thread.
-	 */
-	@Internal
-	public static void createAndSetFileSystemCloseableRegistryForThread() {
-		SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
-		Preconditions.checkState(null == oldRegistry,
-				"Found old CloseableRegistry " + oldRegistry +
-						". This indicates a leak of the InheritableThreadLocal through a ThreadPool!");
-
-		SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
-		REGISTRIES.set(newRegistry);
-		LOG.info("Created new CloseableRegistry " + newRegistry + " for {}", Thread.currentThread().getName());
-	}
-
-	/**
-	 * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the end of the task's
-	 * main thread or when the task should be canceled.
-	 */
-	@Internal
-	public static void closeAndDisposeFileSystemCloseableRegistryForThread() {
-		SafetyNetCloseableRegistry registry = REGISTRIES.get();
-		if (null != registry) {
-			LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
-			REGISTRIES.remove();
-			IOUtils.closeQuietly(registry);
-		}
-	}
-
-	private static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) {
-		SafetyNetCloseableRegistry reg = REGISTRIES.get();
-		return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
-	}
-
 	/** Object used to protect calls to specific methods.*/
 	private static final Object SYNCHRONIZATION_OBJECT = new Object();
 
@@ -427,7 +386,7 @@ public abstract class FileSystem {
 	 *         thrown if a reference to the file system instance could not be obtained
 	 */
 	public static FileSystem get(URI uri) throws IOException {
-		return wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
+		return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
 	}
 
 	/**
@@ -971,7 +930,6 @@ public abstract class FileSystem {
 
 	/**
 	 * An identifier of a file system, via its scheme and its authority.
-	 * This class needs to stay public, because it is detected as part of the public API.
 	 */
 	private static final class FSKey {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
new file mode 100644
index 0000000..b18cb13
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The FileSystemSafetyNet can be used to guard a thread against {@link FileSystem} stream resource leaks.
+ * When activated for a thread, it tracks all streams that are opened by FileSystems that the thread
+ * obtains. The safety net has a global cleanup hook that will close all streams that were
+ * not properly closed.
+ * 
+ * <p>The main thread of each Flink task, as well as the checkpointing thread are automatically guarded
+ * by this safety net.
+ * 
+ * <p><b>Important:</b> This safety net works only for streams created by Flink's FileSystem abstraction,
+ * i.e., for {@code FileSystem} instances obtained via {@link FileSystem#get(URI)} or through
+ * {@link Path#getFileSystem()}.
+ * 
+ * <p><b>Important:</b> When a guarded thread obtains a {@code FileSystem} or a stream and passes them
+ * to another thread, the safety net will close those resources once the former thread finishes.
+ * 
+ * <p>The safety net can be used as follows:
+ * <pre>{@code
+ * 
+ * class GuardedThread extends Thread {
+ * 
+ *     public void run() {
+ *         FileSystemSafetyNet.initializeSafetyNetForThread();
+ *         try {
+ *             // do some heavy stuff where you are unsure whether it closes all streams
+ *             // like some untrusted user code or library code
+ *         }
+ *         finally {
+ *             FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+ *         }
+ *     }
+ * }
+ * }</pre>
+ */
+@Internal
+public class FileSystemSafetyNet {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileSystemSafetyNet.class);
+
+	/** The map from thread to the safety net registry for that thread */
+	private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
+
+	// ------------------------------------------------------------------------
+	//  Activating / Deactivating
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Activates the safety net for a thread. {@link FileSystem} instances obtained by the thread
+	 * that called this method will be guarded, meaning that their created streams are tracked and can
+	 * be closed via the safety net closing hook.
+	 * 
+	 * <p>This method should be called at the beginning of a thread that should be guarded.
+	 * 
+	 * @throws IllegalStateException Thrown, if a safety net was already registered for the thread.
+	 */
+	@Internal
+	public static void initializeSafetyNetForThread() {
+		SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
+
+		checkState(null == oldRegistry, "Found an existing FileSystem safety net for this thread: %s " +
+				"This may indicate an accidental repeated initialization, or a leak of the" +
+				"(Inheritable)ThreadLocal through a ThreadPool.", oldRegistry);
+
+		SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
+		REGISTRIES.set(newRegistry);
+		LOG.info("Created new CloseableRegistry {} for {}", newRegistry, Thread.currentThread().getName());
+	}
+
+	/**
+	 * Closes the safety net for a thread. This closes all remaining unclosed streams that were opened
+	 * by safety-net-guarded file systems. After this method was called, no streams can be opened any more
+	 * from any FileSystem instance that was obtained while the thread was guarded by the safety net.
+	 * 
+	 * <p>This method should be called at the very end of a guarded thread.
+	 */
+	@Internal
+	public static void closeSafetyNetAndGuardedResourcesForThread() {
+		SafetyNetCloseableRegistry registry = REGISTRIES.get();
+		if (null != registry) {
+			LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
+			REGISTRIES.remove();
+			IOUtils.closeQuietly(registry);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) {
+		SafetyNetCloseableRegistry reg = REGISTRIES.get();
+		return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index 2b7a8c8..766ede9 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -108,10 +108,6 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 		return closeableToRef;
 	}
 
-	// ------------------------------------------------------------------------
-	//  
-	// ------------------------------------------------------------------------
-
 	protected abstract void doUnRegister(C closeable, Map<Closeable, T> closeableMap);
 
 	protected abstract void doRegister(C closeable, Map<Closeable, T> closeableMap) throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 6870780..7973c69 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -77,7 +77,7 @@ public class SafetyNetCloseableRegistryTest {
 					FileSystem fs1 = FileSystem.getLocalFileSystem();
 					// ensure no safety net in place
 					Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem);
-					FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+					FileSystemSafetyNet.initializeSafetyNetForThread();
 					fs1 = FileSystem.getLocalFileSystem();
 					// ensure safety net is in place now
 					Assert.assertTrue(fs1 instanceof SafetyNetWrapperFileSystem);
@@ -91,11 +91,11 @@ public class SafetyNetCloseableRegistryTest {
 								FileSystem fs2 = FileSystem.getLocalFileSystem();
 								// ensure the safety net does not leak here
 								Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
-								FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+								FileSystemSafetyNet.initializeSafetyNetForThread();
 								fs2 = FileSystem.getLocalFileSystem();
 								// ensure we can bring another safety net in place
 								Assert.assertTrue(fs2 instanceof SafetyNetWrapperFileSystem);
-								FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+								FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 								fs2 = FileSystem.getLocalFileSystem();
 								// and that we can remove it again
 								Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
@@ -107,7 +107,7 @@ public class SafetyNetCloseableRegistryTest {
 
 						//ensure stream is still open and was never closed by any interferences
 						stream.write(42);
-						FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+						FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 
 						// ensure leaking stream was closed
 						try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5902ea0e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 64a83c9..acb423b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -552,7 +552,7 @@ public class Task implements Runnable, TaskActions {
 			// ----------------------------
 
 			// activate safety net for task thread
-			FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+			FileSystemSafetyNet.initializeSafetyNetForThread();
 
 			// first of all, get a user-code classloader
 			// this may involve downloading the job's JAR files and/or classes
@@ -789,8 +789,9 @@ public class Task implements Runnable, TaskActions {
 
 				// remove all files in the distributed cache
 				removeCachedFiles(distributedCacheEntries, fileCache);
+
 				// close and de-activate safety net for task thread
-				FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+				FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 
 				notifyFinalState();
 			}
@@ -1131,7 +1132,7 @@ public class Task implements Runnable, TaskActions {
 					@Override
 					public void run() {
 						// activate safety net for checkpointing thread
-						FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+						FileSystemSafetyNet.initializeSafetyNetForThread();
 						try {
 							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
 							if (!success) {
@@ -1152,7 +1153,7 @@ public class Task implements Runnable, TaskActions {
 							}
 						} finally {
 							// close and de-activate safety net for checkpointing thread
-							FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+							FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 						}
 					}
 				};