You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/06 10:05:51 UTC

[2/4] flink git commit: [FLINK-7643] [core] Misc. cleanups in FileSystem

[FLINK-7643] [core] Misc. cleanups in FileSystem

  - Simplify access to local file system
  - Use a fair lock for all FileSystem.get() operations
  - Robust falback to local fs for default scheme (avoids URI parsing error on Windows)
  - Deprecate 'getDefaultBlockSize()'
  - Deprecate create(...) with block sizes and replication factor, which is not applicable to many FS


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b786844
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b786844
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b786844

Branch: refs/heads/master
Commit: 3b786844dd9c0ce176eac98c8a05ebe50cb1ebe7
Parents: 84a07a3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 2 14:34:27 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 5 19:14:45 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    | 133 +++++++++++--------
 .../flink/core/fs/local/LocalFileSystem.java    |  20 ++-
 2 files changed, 93 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b786844/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index fab0f4d..7c69425 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -30,7 +30,6 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.util.OperatingSystem;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -195,6 +194,8 @@ public abstract class FileSystem {
 	}
 
 	// ------------------------------------------------------------------------
+	//  File System Implementation Classes
+	// ------------------------------------------------------------------------
 
 	private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
 
@@ -202,24 +203,33 @@ public abstract class FileSystem {
 
 	private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
 
+	// ------------------------------------------------------------------------
+
 	/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
 	 * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */
 	private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
 
-	// ------------------------------------------------------------------------
-
 	/** Object used to protect calls to specific methods.*/
-	private static final Object SYNCHRONIZATION_OBJECT = new Object();
+	private static final ReentrantLock LOCK = new ReentrantLock(true);
 
-	/**
-	 * Data structure mapping file system keys (scheme + authority) to cached file system objects.
-	 */
-	private static final Map<FSKey, FileSystem> CACHE = new HashMap<FSKey, FileSystem>();
+	/** Cache for file systems, by scheme + authority. */
+	private static final Map<FSKey, FileSystem> CACHE = new HashMap<>();
 
-	/**
-	 * Data structure mapping file system schemes to the corresponding implementations
-	 */
-	private static final Map<String, String> FSDIRECTORY = new HashMap<String, String>();
+	/** Mapping of file system schemes to  the corresponding implementations */
+	private static final Map<String, String> FSDIRECTORY = new HashMap<>();
+
+	/** The local file system. Needs to be lazily initialized to avoid that some JVMs deadlock
+	 * on static subclass initialization. */
+	private static LocalFileSystem LOCAL_FS;
+
+	/** The default filesystem scheme to be used, configured during process-wide initialization.
+	 * This value defaults to the local file systems scheme {@code 'file:///'} or
+	 * {@code 'file:/'}. */
+	private static URI defaultScheme;
+
+	// ------------------------------------------------------------------------
+	//  Initialization
+	// ------------------------------------------------------------------------
 
 	static {
 		FSDIRECTORY.put("hdfs", HADOOP_WRAPPER_FILESYSTEM_CLASS);
@@ -228,32 +238,6 @@ public abstract class FileSystem {
 	}
 
 	/**
-	 * Returns a reference to the {@link FileSystem} instance for accessing the
-	 * local file system.
-	 *
-	 * @return a reference to the {@link FileSystem} instance for accessing the
-	 *         local file system.
-	 */
-	public static FileSystem getLocalFileSystem() {
-		// this should really never fail.
-		try {
-			URI localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:///");
-			return get(localUri);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Cannot create URI for local file system");
-		}
-	}
-
-	/**
-	 * The default filesystem scheme to be used. This can be specified by the parameter
-	 * <code>fs.default-scheme</code> in <code>flink-conf.yaml</code>. By default this is
-	 * set to <code>file:///</code> (see {@link ConfigConstants#FILESYSTEM_SCHEME}
-	 * and {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}), and uses the local filesystem.
-	 */
-	private static URI defaultScheme;
-
-	/**
 	 * <p>
 	 * Sets the default filesystem scheme based on the user-specified configuration parameter
 	 * <code>fs.default-scheme</code>. By default this is set to <code>file:///</code>
@@ -269,26 +253,55 @@ public abstract class FileSystem {
 	 * @param config the configuration from where to fetch the parameter.
 	 */
 	public static void setDefaultScheme(Configuration config) throws IOException {
-		synchronized (SYNCHRONIZATION_OBJECT) {
+		LOCK.lock();
+		try {
 			if (defaultScheme == null) {
-				String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME,
-					ConfigConstants.DEFAULT_FILESYSTEM_SCHEME);
-				try {
-					defaultScheme = new URI(stringifiedUri);
-				} catch (URISyntaxException e) {
-					throw new IOException("The URI used to set the default filesystem " +
-						"scheme ('" + stringifiedUri + "') is not valid.");
+				final String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME, null);
+				if (stringifiedUri == null) {
+					defaultScheme = LocalFileSystem.getLocalFsURI();
+				}
+				else {
+					try {
+						defaultScheme = new URI(stringifiedUri);
+					} catch (URISyntaxException e) {
+						throw new IOException("The URI used to set the default filesystem " +
+								"scheme ('" + stringifiedUri + "') is not valid.");
+					}
 				}
 			}
 		}
+		finally {
+			LOCK.unlock();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Obtaining File System Instances
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns a reference to the {@link FileSystem} instance for accessing the local file system.
+	 *
+	 * @return a reference to the {@link FileSystem} instance for accessing the local file system.
+	 */
+	public static FileSystem getLocalFileSystem() {
+		LOCK.lock();
+		try {
+			if (LOCAL_FS == null) {
+				LOCAL_FS = new LocalFileSystem();
+			}
+			return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(LOCAL_FS);
+		} finally {
+			LOCK.unlock();
+		}
 	}
 
 	@Internal
 	public static FileSystem getUnguardedFileSystem(URI uri) throws IOException {
-		FileSystem fs;
+		final URI asked = uri;
 
-		URI asked = uri;
-		synchronized (SYNCHRONIZATION_OBJECT) {
+		LOCK.lock();
+		try {
 
 			if (uri.getScheme() == null) {
 				try {
@@ -333,6 +346,7 @@ public abstract class FileSystem {
 			}
 
 			// Try to create a new file system
+			final FileSystem fs;
 
 			if (!isFlinkSupportedScheme(uri.getScheme())) {
 				// no build in support for this file system. Falling back to Hadoop's FileSystem impl.
@@ -369,9 +383,12 @@ public abstract class FileSystem {
 				// Add new file system object to cache
 				CACHE.put(key, fs);
 			}
-		}
 
-		return fs;
+			return fs;
+		}
+		finally {
+			LOCK.unlock();
+		}
 	}
 
 	/**
@@ -515,7 +532,10 @@ public abstract class FileSystem {
 	 * Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
 	 *
 	 * @return the number of bytes that large input files should be optimally be split into to minimize I/O time
+	 * 
+	 * @deprecated This value is no longer used and is meaningless.
 	 */
+	@Deprecated
 	public long getDefaultBlockSize() {
 		return 32 * 1024 * 1024; // 32 MB;
 	}
@@ -598,8 +618,15 @@ public abstract class FileSystem {
 	 *             Control the behavior of specific file systems via configurations instead. 
 	 */
 	@Deprecated
-	public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
-			long blockSize) throws IOException;
+	public FSDataOutputStream create(
+			Path f,
+			boolean overwrite,
+			int bufferSize,
+			short replication,
+			long blockSize) throws IOException {
+
+		return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE);
+	}
 
 	/**
 	 * Opens an FSDataOutputStream at the indicated Path.

http://git-wip-us.apache.org/repos/asf/flink/blob/3b786844/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 0e3e9f3..f21a481 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -258,13 +258,6 @@ public class LocalFileSystem extends FileSystem {
 	}
 
 	@Override
-	public FSDataOutputStream create(
-			Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
-		return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE);
-	}
-
-
-	@Override
 	public boolean rename(final Path src, final Path dst) throws IOException {
 		final File srcFile = pathToFile(src);
 		final File dstFile = pathToFile(dst);
@@ -289,4 +282,17 @@ public class LocalFileSystem extends FileSystem {
 	public boolean isDistributedFS() {
 		return false;
 	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the URI that represents the local file system.
+	 * That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
+	 * UNIX family platforms.
+	 * 
+	 * @return The URI that represents the local file system.
+	 */
+	public static URI getLocalFsURI() {
+		return uri;
+	}
 }