You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/14 12:10:11 UTC

[04/12] flink git commit: [hotfix] Clean up structure and comments in 'FileSystem'

[hotfix] Clean up structure and comments in 'FileSystem'

This commit aims to improve the readability of the FileSystem class.
The commit does not introduce new/different code, but introduces sections in the class, moves nested
classes and methods between these sections.
The commit also improves comments for the class and nested classes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ecf6c86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ecf6c86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ecf6c86

Branch: refs/heads/master
Commit: 7ecf6c86de2a782ba0aa34439c3387da82f42808
Parents: 3560f2e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Dec 13 16:54:39 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:32 2016 +0100

----------------------------------------------------------------------
 flink-core/pom.xml                              |   1 +
 .../org/apache/flink/core/fs/FileSystem.java    | 194 ++++++++++---------
 2 files changed, 100 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ecf6c86/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ffbfe70..396ef86 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -125,6 +125,7 @@ under the License.
 					<parameter>
 						<excludes combine.children="append">
 							<exclude>org.apache.flink.api.common.ExecutionConfig#CONFIG_KEY</exclude>
+							<exclude>org.apache.flink.core.fs.FileSystem$FSKey</exclude>
 							<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
 						</excludes>
 					</parameter>

http://git-wip-us.apache.org/repos/asf/flink/blob/7ecf6c86/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 433cec0..3dced6f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -28,11 +28,14 @@ package org.apache.flink.core.fs;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OperatingSystem;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -42,17 +45,38 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * An abstract base class for a fairly generic file system. It
- * may be implemented as a distributed file system, or as a local
- * one that reflects the locally-connected disk.
+ * Abstract base class of all file systems used by Flink. This class may be extended to implement
+ * distributed file systems, or local file systems. The abstraction by this file system is very simple,
+ * and teh set of allowed operations quite limited, to support the common denominator of a wide
+ * range of file systems. For example, appending to or mutating existing files is not supported.
+ * 
+ * <p>Flink implements and supports some file system types directly (for example the default
+ * machine-local file system). Other file system types are accessed by an implementation that bridges
+ * to the suite of file systems supported by Hadoop (such as for example HDFS).
  */
 @Public
 public abstract class FileSystem {
 
-	private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
+	/**
+	 * The possible write modes. The write mode decides what happens if a file should be created,
+	 * but already exists.
+	 */
+	public enum WriteMode {
+
+		/** Creates the target file if it does not exist. Does not overwrite existing files and directories. */
+		NO_OVERWRITE,
+
+		/** Creates a new target file regardless of any existing files or directories. Existing files and
+		 * directories will be removed/overwritten. */
+		OVERWRITE
+	}
 
-	private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";
+	// ------------------------------------------------------------------------
+
+	private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>();
 
 	private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
 
@@ -98,89 +122,6 @@ public abstract class FileSystem {
 	private static final Object SYNCHRONIZATION_OBJECT = new Object();
 
 	/**
-	 * Enumeration for write modes.
-	 *
-	 */
-	public enum WriteMode {
-
-		/** Creates write path if it does not exist. Does not overwrite existing files and directories. */
-		NO_OVERWRITE,
-
-		/** Creates write path if it does not exist. Overwrites existing files and directories. */
-		OVERWRITE
-	}
-
-	/**
-	 * An auxiliary class to identify a file system by its scheme and its authority.
-	 */
-	public static class FSKey {
-
-		/**
-		 * The scheme of the file system.
-		 */
-		private String scheme;
-
-		/**
-		 * The authority of the file system.
-		 */
-		private String authority;
-
-		/**
-		 * Creates a file system key from a given scheme and an
-		 * authority.
-		 *
-		 * @param scheme
-		 *        the scheme of the file system
-		 * @param authority
-		 *        the authority of the file system
-		 */
-		public FSKey(final String scheme, final String authority) {
-			this.scheme = scheme;
-			this.authority = authority;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public boolean equals(final Object obj) {
-			if (obj == this) {
-				return true;
-			}
-
-			if (obj instanceof FSKey) {
-				final FSKey key = (FSKey) obj;
-
-				if (!this.scheme.equals(key.scheme)) {
-					return false;
-				}
-
-				if ((this.authority == null) || (key.authority == null)) {
-					return this.authority == null && key.authority == null;
-				}
-				return this.authority.equals(key.authority);
-			}
-			return false;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public int hashCode() {
-			if (this.scheme != null) {
-				return this.scheme.hashCode();
-			}
-
-			if (this.authority != null) {
-				return this.authority.hashCode();
-			}
-
-			return super.hashCode();
-		}
-	}
-
-	/**
 	 * Data structure mapping file system keys (scheme + authority) to cached file system objects.
 	 */
 	private static final Map<FSKey, FileSystem> CACHE = new HashMap<FSKey, FileSystem>();
@@ -193,7 +134,7 @@ public abstract class FileSystem {
 	static {
 		FSDIRECTORY.put("hdfs", HADOOP_WRAPPER_FILESYSTEM_CLASS);
 		FSDIRECTORY.put("maprfs", MAPR_FILESYSTEM_CLASS);
-		FSDIRECTORY.put("file", LOCAL_FILESYSTEM_CLASS);
+		FSDIRECTORY.put("file", LocalFileSystem.class.getName());
 	}
 
 	/**
@@ -405,6 +346,11 @@ public abstract class FileSystem {
 		return hadoopWrapper.getHadoopWrapperClassNameForFileSystem(scheme);
 	}
 
+
+	// ------------------------------------------------------------------------
+	//  File System Methods
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Returns the path of the file system's current working directory.
 	 *
@@ -577,6 +523,16 @@ public abstract class FileSystem {
 	 */
 	public abstract boolean rename(Path src, Path dst) throws IOException;
 
+	/**
+	 * Returns true if this is a distributed file system, false otherwise.
+	 *
+	 * @return True, if this is a distributed file system, false otherwise.
+	 */
+	public abstract boolean isDistributedFS();
+
+	// ------------------------------------------------------------------------
+	//  output directory initialization
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Initializes output directories on local file systems according to the given write mode.
@@ -814,15 +770,63 @@ public abstract class FileSystem {
 
 	}
 
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static Class<? extends FileSystem> getFileSystemByName(String className) throws ClassNotFoundException {
+		return Class.forName(className, true, FileSystem.class.getClassLoader()).asSubclass(FileSystem.class);
+	}
+
 	/**
-	 * Returns true if this is a distributed file system, false otherwise.
-	 *
-	 * @return True if this is a distributed file system, false otherwise.
+	 * An identifier of a file system, via its scheme and its authority.
+	 * This class needs to stay public, because it is detected as part of the public API.
 	 */
-	public abstract boolean isDistributedFS();
+	public static class FSKey {
 
+		/** The scheme of the file system. */
+		private final String scheme;
 
-	private static Class<? extends FileSystem> getFileSystemByName(String className) throws ClassNotFoundException {
-		return Class.forName(className, true, FileSystem.class.getClassLoader()).asSubclass(FileSystem.class);
+		/** The authority of the file system. */
+		@Nullable
+		private final String authority;
+
+		/**
+		 * Creates a file system key from a given scheme and an authority.
+		 *
+		 * @param scheme     The scheme of the file system
+		 * @param authority  The authority of the file system
+		 */
+		public FSKey(String scheme, @Nullable String authority) {
+			this.scheme = checkNotNull(scheme, "scheme");
+			this.authority = authority;
+		}
+
+		@Override
+		public boolean equals(final Object obj) {
+			if (obj == this) {
+				return true;
+			}
+			else if (obj != null && obj.getClass() == FSKey.class) {
+				final FSKey that = (FSKey) obj;
+				return this.scheme.equals(that.scheme) &&
+						(this.authority == null ? that.authority == null :
+								(that.authority != null && this.authority.equals(that.authority)));
+			}
+			else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			return 31 * scheme.hashCode() + 
+					(authority == null ? 17 : authority.hashCode());
+		}
+
+		@Override
+		public String toString() {
+			return scheme + "://" + authority;
+		}
 	}
 }