You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:28 UTC

[14/19] flink git commit: [FLINK-5812] [core] Cleanups in FileSystem (round 1)

[FLINK-5812] [core] Cleanups in FileSystem (round 1)

  - This makes the FileSystem use the 'WriteMode' (otherwise it was an unused enumeration)
  - Extends comments
  - Deprecate the method that controls the replication factor and block size


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1bfae95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1bfae95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1bfae95

Branch: refs/heads/master
Commit: a1bfae95fec8d076ef90d5a36ffa32d3870870d8
Parents: 31c26e3
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 15 17:10:53 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:24 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/FileOutputFormat.java   |  4 +-
 .../org/apache/flink/core/fs/FileSystem.java    | 82 +++++++++++++++++---
 .../core/fs/SafetyNetWrapperFileSystem.java     |  6 +-
 .../flink/core/fs/local/LocalFileSystem.java    | 26 ++++---
 .../flink/util/AbstractCloseableRegistry.java   | 15 ++--
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  4 +-
 .../flink/runtime/fs/maprfs/MapRFileSystem.java |  4 +-
 7 files changed, 104 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 0ab12df..1382f06 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -104,7 +104,7 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen
 	protected Path outputFilePath;
 	
 	/**
-	 * The write mode of the output.	
+	 * The write mode of the output.
 	 */
 	private WriteMode writeMode;
 	
@@ -249,7 +249,7 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen
 		this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + getDirectoryFileName(taskNumber)) : p;
 
 		// create output file
-		this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE);
+		this.stream = fs.create(this.actualFilePath, writeMode);
 		
 		// at this point, the file creation must have succeeded, or an exception has been thrown
 		this.fileCreated = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index c3828fb..4149d5e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +60,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * machine-local file system). Other file system types are accessed by an implementation that bridges
  * to the suite of file systems supported by Hadoop (such as for example HDFS).
  * 
- * <h2>Data Persistence</h2>
+ * <h2>Scope and Purpose</h2>
+ * 
+ * The purpose of this abstraction is used to expose a common and well defined interface for
+ * access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing
+ * state and recovery data) and by reusable built-in connectors (file sources / sinks).
+ * 
+ * <p>The purpose of this abstraction is <b>not</b> to give user programs an abstraction with
+ * extreme flexibility and control across all possible file systems. That mission would be a folly,
+ * as the differences in characteristics of even the most common file systems are already quite
+ * large. It is expected that user programs that need specialized functionality of certain file systems
+ * in their functions, operations, sources, or sinks instantiate the specialized file system adapters
+ * directly.
+ * 
+ * <h2>Data Persistence Contract</h2>
  * 
  * The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data,
  * both for results of streaming applications and for fault tolerance and recovery. It is therefore
@@ -152,6 +166,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * in between read or write operations, because there are no guarantees about the visibility of
  * operations across threads (many operations do not create memory fences).
  * 
+ * <h2>Streams Safety Net</h2>
+ * 
+ * When application code obtains a FileSystem (via {@link FileSystem#get(URI)} or via
+ * {@link Path#getFileSystem()}), the FileSystem instantiates a safety net for that FileSystem.
+ * The safety net ensures that all streams created from the FileSystem are closed when the
+ * application task finishes (or is canceled or failed). That way, the task's threads do not
+ * leak connections.
+ * 
  * @see FSDataInputStream
  * @see FSDataOutputStream
  */
@@ -164,11 +186,13 @@ public abstract class FileSystem {
 	 */
 	public enum WriteMode {
 
-		/** Creates the target file if it does not exist. Does not overwrite existing files and directories. */
+		/** Creates the target file only if no file exists at that path already.
+		 * Does not overwrite existing files and directories. */
 		NO_OVERWRITE,
 
-		/** Creates a new target file regardless of any existing files or directories. Existing files and
-		 * directories will be removed/overwritten. */
+		/** Creates a new target file regardless of any existing files or directories.
+		 * Existing files and directories will be deleted (recursively) automatically before
+		 * creating the new file. */
 		OVERWRITE
 	}
 
@@ -555,7 +579,6 @@ public abstract class FileSystem {
 	 *        source file
 	 */
 	public boolean exists(final Path f) throws IOException {
-
 		try {
 			return (getFileStatus(f) != null);
 		} catch (FileNotFoundException e) {
@@ -590,6 +613,11 @@ public abstract class FileSystem {
 
 	/**
 	 * Opens an FSDataOutputStream at the indicated Path.
+	 * 
+	 * <p>This method is deprecated, because most of its parameters are ignored by most file systems.
+	 * To control for example the replication factor and block size in the Hadoop Distributed File system,
+	 * make sure that the respective Hadoop configuration file is either linked from the Flink configuration,
+	 * or in the classpath of either Flink or the user code.
 	 *
 	 * @param f
 	 *        the file name to open
@@ -602,8 +630,15 @@ public abstract class FileSystem {
 	 *        required block replication for the file.
 	 * @param blockSize
 	 *        the size of the file blocks
-	 * @throws IOException
+	 * 
+	 * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
+	 *                     a file already exists at that path and the write mode indicates to not
+	 *                     overwrite the file.
+	 * 
+	 * @deprecated Deprecated because not well supported across types of file systems.
+	 *             Control the behavior of specific file systems via configurations instead. 
 	 */
+	@Deprecated
 	public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
 			long blockSize) throws IOException;
 
@@ -615,9 +650,34 @@ public abstract class FileSystem {
 	 * @param overwrite
 	 *        if a file with this name already exists, then if true,
 	 *        the file will be overwritten, and if false an error will be thrown.
-	 * @throws IOException
+	 * 
+	 * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
+	 *                     a file already exists at that path and the write mode indicates to not
+	 *                     overwrite the file.
+	 * 
+	 * @deprecated Use {@link #create(Path, WriteMode)} instead.
+	 */
+	@Deprecated
+	public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+		return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE);
+	}
+
+	/**
+	 * Opens an FSDataOutputStream to a new file at the given path.
+	 * 
+	 * <p>If the file already exists, the behavior depends on the given {@code WriteMode}.
+	 * If the mode is set to {@link WriteMode#NO_OVERWRITE}, then this method fails with an
+	 * exception.
+	 *
+	 * @param f The file path to write to
+	 * @param overwriteMode The action to take if a file or directory already exists at the given path.
+	 * @return The stream to the new file at the target path.
+	 * 
+	 * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
+	 *                     a file already exists at that path and the write mode indicates to not
+	 *                     overwrite the file.
 	 */
-	public abstract FSDataOutputStream create(Path f, boolean overwrite) throws IOException;
+	public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;
 
 	/**
 	 * Renames the file/directory src to dst.
@@ -632,7 +692,9 @@ public abstract class FileSystem {
 	public abstract boolean rename(Path src, Path dst) throws IOException;
 
 	/**
-	 * Returns true if this is a distributed file system, false otherwise.
+	 * Returns true if this is a distributed file system. A distributed file system here means
+	 * that the file system is shared among all Flink processes that participate in a cluster or
+	 * job and that all these processes can see the same files.
 	 *
 	 * @return True, if this is a distributed file system, false otherwise.
 	 */
@@ -911,7 +973,7 @@ public abstract class FileSystem {
 	 * An identifier of a file system, via its scheme and its authority.
 	 * This class needs to stay public, because it is detected as part of the public API.
 	 */
-	public static class FSKey {
+	private static final class FSKey {
 
 		/** The scheme of the file system. */
 		private final String scheme;

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index 63e6253..1dacafd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -30,8 +30,8 @@ import java.net.URI;
  * {@link ClosingFSDataInputStream} or {@link ClosingFSDataOutputStream} and (ii) registers them to
  * a {@link SafetyNetCloseableRegistry}.
  *
- * Streams obtained by this are therefore managed by the {@link SafetyNetCloseableRegistry} to prevent resource leaks
- * from unclosed streams.
+ * <p>Streams obtained by this are therefore managed by the {@link SafetyNetCloseableRegistry} to
+ * prevent resource leaks from unclosed streams.
  */
 @Internal
 public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingProxy<FileSystem> {
@@ -120,7 +120,7 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
 	}
 
 	@Override
-	public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+	public FSDataOutputStream create(Path f, WriteMode overwrite) throws IOException {
 		FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite);
 		return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index acbf814..12aeb7f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -43,9 +43,12 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.nio.file.FileAlreadyExistsException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The class <code>LocalFile</code> provides an implementation of the {@link FileSystem} interface
+ * The class {@code LocalFileSystem} is an implementation of the {@link FileSystem} interface
  * for the local file system of the machine where the JVM runs.
  */
 @Internal
@@ -231,28 +234,27 @@ public class LocalFileSystem extends FileSystem {
 		return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
 	}
 
-
 	@Override
-	public FSDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
-			final short replication, final long blockSize) throws IOException {
+	public FSDataOutputStream create(final Path filePath, final WriteMode overwrite) throws IOException {
+		checkNotNull(filePath, "filePath");
 
-		if (exists(f) && !overwrite) {
-			throw new IOException("File already exists:" + f);
+		if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
+			throw new FileAlreadyExistsException("File already exists: " + filePath);
 		}
 
-		final Path parent = f.getParent();
+		final Path parent = filePath.getParent();
 		if (parent != null && !mkdirs(parent)) {
-			throw new IOException("Mkdirs failed to create " + parent.toString());
+			throw new IOException("Mkdirs failed to create " + parent);
 		}
 
-		final File file = pathToFile(f);
+		final File file = pathToFile(filePath);
 		return new LocalDataOutputStream(file);
 	}
 
-
 	@Override
-	public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
-		return create(f, overwrite, 0, (short) 0, 0);
+	public FSDataOutputStream create(
+			Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
+		return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index e165d97..2b7a8c8 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -27,10 +27,10 @@ import java.util.Map;
 /**
  * This is the abstract base class for registries that allow to register instances of {@link Closeable}, which are all
  * closed if this registry is closed.
- * <p>
- * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
- * <p>
- * All methods in this class are thread-safe.
+ * 
+ * <p>Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
+ * 
+ * <p>All methods in this class are thread-safe.
  *
  * @param <C> Type of the closeable this registers
  * @param <T> Type for potential meta data associated with the registering closeables
@@ -51,7 +51,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 	 * {@link IllegalStateException} and closes the passed {@link Closeable}.
 	 *
 	 * @param closeable Closeable tor register
-	 * @return true if the the Closeable was newly added to the registry
+	 * 
 	 * @throws IOException exception when the registry was closed before
 	 */
 	public final void registerClosable(C closeable) throws IOException {
@@ -74,7 +74,6 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 	 * Removes a {@link Closeable} from the registry.
 	 *
 	 * @param closeable instance to remove from the registry.
-	 * @return true, if the instance was actually registered and now removed
 	 */
 	public final void unregisterClosable(C closeable) {
 
@@ -109,6 +108,10 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 		return closeableToRef;
 	}
 
+	// ------------------------------------------------------------------------
+	//  
+	// ------------------------------------------------------------------------
+
 	protected abstract void doUnRegister(C closeable, Map<Closeable, T> closeableMap);
 
 	protected abstract void doRegister(C closeable, Map<Closeable, T> closeableMap) throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 36dfa55..1371d21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -417,9 +417,9 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 
 
 	@Override
-	public HadoopDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
+	public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
 		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
-			.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite);
+			.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE);
 		return new HadoopDataOutputStream(fsDataOutputStream);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index a7ef441..57eea6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -327,11 +327,11 @@ public final class MapRFileSystem extends FileSystem {
 	}
 
 	@Override
-	public FSDataOutputStream create(final Path f, final boolean overwrite)
+	public FSDataOutputStream create(final Path f, final WriteMode overwrite)
 			throws IOException {
 
 		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
-				new org.apache.hadoop.fs.Path(f.toString()), overwrite);
+				new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE);
 
 		return new HadoopDataOutputStream(fdos);
 	}