You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/06 10:05:51 UTC
[2/4] flink git commit: [FLINK-7643] [core] Misc. cleanups in
FileSystem
[FLINK-7643] [core] Misc. cleanups in FileSystem
- Simplify access to local file system
- Use a fair lock for all FileSystem.get() operations
- Robust falback to local fs for default scheme (avoids URI parsing error on Windows)
- Deprecate 'getDefaultBlockSize()'
- Deprecate create(...) with block sizes and replication factor, which is not applicable to many FS
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b786844
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b786844
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b786844
Branch: refs/heads/master
Commit: 3b786844dd9c0ce176eac98c8a05ebe50cb1ebe7
Parents: 84a07a3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 2 14:34:27 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 5 19:14:45 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/core/fs/FileSystem.java | 133 +++++++++++--------
.../flink/core/fs/local/LocalFileSystem.java | 20 ++-
2 files changed, 93 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3b786844/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index fab0f4d..7c69425 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -30,7 +30,6 @@ import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.util.OperatingSystem;
import javax.annotation.Nullable;
import java.io.File;
@@ -195,6 +194,8 @@ public abstract class FileSystem {
}
// ------------------------------------------------------------------------
+ // File System Implementation Classes
+ // ------------------------------------------------------------------------
private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
@@ -202,24 +203,33 @@ public abstract class FileSystem {
private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
+ // ------------------------------------------------------------------------
+
/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
* {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */
private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
- // ------------------------------------------------------------------------
-
/** Object used to protect calls to specific methods.*/
- private static final Object SYNCHRONIZATION_OBJECT = new Object();
+ private static final ReentrantLock LOCK = new ReentrantLock(true);
- /**
- * Data structure mapping file system keys (scheme + authority) to cached file system objects.
- */
- private static final Map<FSKey, FileSystem> CACHE = new HashMap<FSKey, FileSystem>();
+ /** Cache for file systems, by scheme + authority. */
+ private static final Map<FSKey, FileSystem> CACHE = new HashMap<>();
- /**
- * Data structure mapping file system schemes to the corresponding implementations
- */
- private static final Map<String, String> FSDIRECTORY = new HashMap<String, String>();
+ /** Mapping of file system schemes to the corresponding implementations */
+ private static final Map<String, String> FSDIRECTORY = new HashMap<>();
+
+ /** The local file system. Needs to be lazily initialized to avoid that some JVMs deadlock
+ * on static subclass initialization. */
+ private static LocalFileSystem LOCAL_FS;
+
+ /** The default filesystem scheme to be used, configured during process-wide initialization.
+ * This value defaults to the local file systems scheme {@code 'file:///'} or
+ * {@code 'file:/'}. */
+ private static URI defaultScheme;
+
+ // ------------------------------------------------------------------------
+ // Initialization
+ // ------------------------------------------------------------------------
static {
FSDIRECTORY.put("hdfs", HADOOP_WRAPPER_FILESYSTEM_CLASS);
@@ -228,32 +238,6 @@ public abstract class FileSystem {
}
/**
- * Returns a reference to the {@link FileSystem} instance for accessing the
- * local file system.
- *
- * @return a reference to the {@link FileSystem} instance for accessing the
- * local file system.
- */
- public static FileSystem getLocalFileSystem() {
- // this should really never fail.
- try {
- URI localUri = OperatingSystem.isWindows() ? new URI("file:/") : new URI("file:///");
- return get(localUri);
- }
- catch (Exception e) {
- throw new RuntimeException("Cannot create URI for local file system");
- }
- }
-
- /**
- * The default filesystem scheme to be used. This can be specified by the parameter
- * <code>fs.default-scheme</code> in <code>flink-conf.yaml</code>. By default this is
- * set to <code>file:///</code> (see {@link ConfigConstants#FILESYSTEM_SCHEME}
- * and {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}), and uses the local filesystem.
- */
- private static URI defaultScheme;
-
- /**
* <p>
* Sets the default filesystem scheme based on the user-specified configuration parameter
* <code>fs.default-scheme</code>. By default this is set to <code>file:///</code>
@@ -269,26 +253,55 @@ public abstract class FileSystem {
* @param config the configuration from where to fetch the parameter.
*/
public static void setDefaultScheme(Configuration config) throws IOException {
- synchronized (SYNCHRONIZATION_OBJECT) {
+ LOCK.lock();
+ try {
if (defaultScheme == null) {
- String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME,
- ConfigConstants.DEFAULT_FILESYSTEM_SCHEME);
- try {
- defaultScheme = new URI(stringifiedUri);
- } catch (URISyntaxException e) {
- throw new IOException("The URI used to set the default filesystem " +
- "scheme ('" + stringifiedUri + "') is not valid.");
+ final String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME, null);
+ if (stringifiedUri == null) {
+ defaultScheme = LocalFileSystem.getLocalFsURI();
+ }
+ else {
+ try {
+ defaultScheme = new URI(stringifiedUri);
+ } catch (URISyntaxException e) {
+ throw new IOException("The URI used to set the default filesystem " +
+ "scheme ('" + stringifiedUri + "') is not valid.");
+ }
}
}
}
+ finally {
+ LOCK.unlock();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Obtaining File System Instances
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns a reference to the {@link FileSystem} instance for accessing the local file system.
+ *
+ * @return a reference to the {@link FileSystem} instance for accessing the local file system.
+ */
+ public static FileSystem getLocalFileSystem() {
+ LOCK.lock();
+ try {
+ if (LOCAL_FS == null) {
+ LOCAL_FS = new LocalFileSystem();
+ }
+ return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(LOCAL_FS);
+ } finally {
+ LOCK.unlock();
+ }
}
@Internal
public static FileSystem getUnguardedFileSystem(URI uri) throws IOException {
- FileSystem fs;
+ final URI asked = uri;
- URI asked = uri;
- synchronized (SYNCHRONIZATION_OBJECT) {
+ LOCK.lock();
+ try {
if (uri.getScheme() == null) {
try {
@@ -333,6 +346,7 @@ public abstract class FileSystem {
}
// Try to create a new file system
+ final FileSystem fs;
if (!isFlinkSupportedScheme(uri.getScheme())) {
// no build in support for this file system. Falling back to Hadoop's FileSystem impl.
@@ -369,9 +383,12 @@ public abstract class FileSystem {
// Add new file system object to cache
CACHE.put(key, fs);
}
- }
- return fs;
+ return fs;
+ }
+ finally {
+ LOCK.unlock();
+ }
}
/**
@@ -515,7 +532,10 @@ public abstract class FileSystem {
* Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
*
* @return the number of bytes that large input files should be optimally be split into to minimize I/O time
+ *
+ * @deprecated This value is no longer used and is meaningless.
*/
+ @Deprecated
public long getDefaultBlockSize() {
return 32 * 1024 * 1024; // 32 MB;
}
@@ -598,8 +618,15 @@ public abstract class FileSystem {
* Control the behavior of specific file systems via configurations instead.
*/
@Deprecated
- public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
- long blockSize) throws IOException;
+ public FSDataOutputStream create(
+ Path f,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize) throws IOException {
+
+ return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE);
+ }
/**
* Opens an FSDataOutputStream at the indicated Path.
http://git-wip-us.apache.org/repos/asf/flink/blob/3b786844/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 0e3e9f3..f21a481 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -258,13 +258,6 @@ public class LocalFileSystem extends FileSystem {
}
@Override
- public FSDataOutputStream create(
- Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
- return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE);
- }
-
-
- @Override
public boolean rename(final Path src, final Path dst) throws IOException {
final File srcFile = pathToFile(src);
final File dstFile = pathToFile(dst);
@@ -289,4 +282,17 @@ public class LocalFileSystem extends FileSystem {
public boolean isDistributedFS() {
return false;
}
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the URI that represents the local file system.
+ * That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
+ * UNIX family platforms.
+ *
+ * @return The URI that represents the local file system.
+ */
+ public static URI getLocalFsURI() {
+ return uri;
+ }
}