You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/14 12:10:11 UTC
[04/12] flink git commit: [hotfix] Clean up structure and comments in
'FileSystem'
[hotfix] Clean up structure and comments in 'FileSystem'
This commit aims to improve the readability of the FileSystem class.
The commit does not introduce new/different code, but introduces sections in the class, moves nested
classes and methods between these sections.
The commit also improves comments for the class and nested classes.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ecf6c86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ecf6c86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ecf6c86
Branch: refs/heads/master
Commit: 7ecf6c86de2a782ba0aa34439c3387da82f42808
Parents: 3560f2e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 13 16:54:39 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100
----------------------------------------------------------------------
flink-core/pom.xml | 1 +
.../org/apache/flink/core/fs/FileSystem.java | 194 ++++++++++---------
2 files changed, 100 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7ecf6c86/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ffbfe70..396ef86 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -125,6 +125,7 @@ under the License.
<parameter>
<excludes combine.children="append">
<exclude>org.apache.flink.api.common.ExecutionConfig#CONFIG_KEY</exclude>
+ <exclude>org.apache.flink.core.fs.FileSystem$FSKey</exclude>
<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
</excludes>
</parameter>
http://git-wip-us.apache.org/repos/asf/flink/blob/7ecf6c86/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 433cec0..3dced6f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -28,11 +28,14 @@ package org.apache.flink.core.fs;
import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.OperatingSystem;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -42,17 +45,38 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
- * An abstract base class for a fairly generic file system. It
- * may be implemented as a distributed file system, or as a local
- * one that reflects the locally-connected disk.
+ * Abstract base class of all file systems used by Flink. This class may be extended to implement
+ * distributed file systems, or local file systems. The abstraction by this file system is very simple,
+ * and teh set of allowed operations quite limited, to support the common denominator of a wide
+ * range of file systems. For example, appending to or mutating existing files is not supported.
+ *
+ * <p>Flink implements and supports some file system types directly (for example the default
+ * machine-local file system). Other file system types are accessed by an implementation that bridges
+ * to the suite of file systems supported by Hadoop (such as for example HDFS).
*/
@Public
public abstract class FileSystem {
- private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
+ /**
+ * The possible write modes. The write mode decides what happens if a file should be created,
+ * but already exists.
+ */
+ public enum WriteMode {
+
+ /** Creates the target file if it does not exist. Does not overwrite existing files and directories. */
+ NO_OVERWRITE,
+
+ /** Creates a new target file regardless of any existing files or directories. Existing files and
+ * directories will be removed/overwritten. */
+ OVERWRITE
+ }
- private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";
+ // ------------------------------------------------------------------------
+
+ private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
@@ -98,89 +122,6 @@ public abstract class FileSystem {
private static final Object SYNCHRONIZATION_OBJECT = new Object();
/**
- * Enumeration for write modes.
- *
- */
- public enum WriteMode {
-
- /** Creates write path if it does not exist. Does not overwrite existing files and directories. */
- NO_OVERWRITE,
-
- /** Creates write path if it does not exist. Overwrites existing files and directories. */
- OVERWRITE
- }
-
- /**
- * An auxiliary class to identify a file system by its scheme and its authority.
- */
- public static class FSKey {
-
- /**
- * The scheme of the file system.
- */
- private String scheme;
-
- /**
- * The authority of the file system.
- */
- private String authority;
-
- /**
- * Creates a file system key from a given scheme and an
- * authority.
- *
- * @param scheme
- * the scheme of the file system
- * @param authority
- * the authority of the file system
- */
- public FSKey(final String scheme, final String authority) {
- this.scheme = scheme;
- this.authority = authority;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean equals(final Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (obj instanceof FSKey) {
- final FSKey key = (FSKey) obj;
-
- if (!this.scheme.equals(key.scheme)) {
- return false;
- }
-
- if ((this.authority == null) || (key.authority == null)) {
- return this.authority == null && key.authority == null;
- }
- return this.authority.equals(key.authority);
- }
- return false;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int hashCode() {
- if (this.scheme != null) {
- return this.scheme.hashCode();
- }
-
- if (this.authority != null) {
- return this.authority.hashCode();
- }
-
- return super.hashCode();
- }
- }
-
- /**
* Data structure mapping file system keys (scheme + authority) to cached file system objects.
*/
private static final Map<FSKey, FileSystem> CACHE = new HashMap<FSKey, FileSystem>();
@@ -193,7 +134,7 @@ public abstract class FileSystem {
static {
FSDIRECTORY.put("hdfs", HADOOP_WRAPPER_FILESYSTEM_CLASS);
FSDIRECTORY.put("maprfs", MAPR_FILESYSTEM_CLASS);
- FSDIRECTORY.put("file", LOCAL_FILESYSTEM_CLASS);
+ FSDIRECTORY.put("file", LocalFileSystem.class.getName());
}
/**
@@ -405,6 +346,11 @@ public abstract class FileSystem {
return hadoopWrapper.getHadoopWrapperClassNameForFileSystem(scheme);
}
+
+ // ------------------------------------------------------------------------
+ // File System Methods
+ // ------------------------------------------------------------------------
+
/**
* Returns the path of the file system's current working directory.
*
@@ -577,6 +523,16 @@ public abstract class FileSystem {
*/
public abstract boolean rename(Path src, Path dst) throws IOException;
+ /**
+ * Returns true if this is a distributed file system, false otherwise.
+ *
+ * @return True, if this is a distributed file system, false otherwise.
+ */
+ public abstract boolean isDistributedFS();
+
+ // ------------------------------------------------------------------------
+ // output directory initialization
+ // ------------------------------------------------------------------------
/**
* Initializes output directories on local file systems according to the given write mode.
@@ -814,15 +770,63 @@ public abstract class FileSystem {
}
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static Class<? extends FileSystem> getFileSystemByName(String className) throws ClassNotFoundException {
+ return Class.forName(className, true, FileSystem.class.getClassLoader()).asSubclass(FileSystem.class);
+ }
+
/**
- * Returns true if this is a distributed file system, false otherwise.
- *
- * @return True if this is a distributed file system, false otherwise.
+ * An identifier of a file system, via its scheme and its authority.
+ * This class needs to stay public, because it is detected as part of the public API.
*/
- public abstract boolean isDistributedFS();
+ public static class FSKey {
+ /** The scheme of the file system. */
+ private final String scheme;
- private static Class<? extends FileSystem> getFileSystemByName(String className) throws ClassNotFoundException {
- return Class.forName(className, true, FileSystem.class.getClassLoader()).asSubclass(FileSystem.class);
+ /** The authority of the file system. */
+ @Nullable
+ private final String authority;
+
+ /**
+ * Creates a file system key from a given scheme and an authority.
+ *
+ * @param scheme The scheme of the file system
+ * @param authority The authority of the file system
+ */
+ public FSKey(String scheme, @Nullable String authority) {
+ this.scheme = checkNotNull(scheme, "scheme");
+ this.authority = authority;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ else if (obj != null && obj.getClass() == FSKey.class) {
+ final FSKey that = (FSKey) obj;
+ return this.scheme.equals(that.scheme) &&
+ (this.authority == null ? that.authority == null :
+ (that.authority != null && this.authority.equals(that.authority)));
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * scheme.hashCode() +
+ (authority == null ? 17 : authority.hashCode());
+ }
+
+ @Override
+ public String toString() {
+ return scheme + "://" + authority;
+ }
}
}