You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:28 UTC
[14/19] flink git commit: [FLINK-5812] [core] Cleanups in FileSystem
(round 1)
[FLINK-5812] [core] Cleanups in FileSystem (round 1)
- This makes the FileSystem use the 'WriteMode' (otherwise it was an unused enumeration)
- Extends comments
- Deprecate the method that controls the replication factor and block size
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1bfae95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1bfae95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1bfae95
Branch: refs/heads/master
Commit: a1bfae95fec8d076ef90d5a36ffa32d3870870d8
Parents: 31c26e3
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 15 17:10:53 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:24 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/io/FileOutputFormat.java | 4 +-
.../org/apache/flink/core/fs/FileSystem.java | 82 +++++++++++++++++---
.../core/fs/SafetyNetWrapperFileSystem.java | 6 +-
.../flink/core/fs/local/LocalFileSystem.java | 26 ++++---
.../flink/util/AbstractCloseableRegistry.java | 15 ++--
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 4 +-
.../flink/runtime/fs/maprfs/MapRFileSystem.java | 4 +-
7 files changed, 104 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 0ab12df..1382f06 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -104,7 +104,7 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen
protected Path outputFilePath;
/**
- * The write mode of the output.
+ * The write mode of the output.
*/
private WriteMode writeMode;
@@ -249,7 +249,7 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen
this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + getDirectoryFileName(taskNumber)) : p;
// create output file
- this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE);
+ this.stream = fs.create(this.actualFilePath, writeMode);
// at this point, the file creation must have succeeded, or an exception has been thrown
this.fileCreated = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index c3828fb..4149d5e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +60,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* machine-local file system). Other file system types are accessed by an implementation that bridges
* to the suite of file systems supported by Hadoop (such as for example HDFS).
*
- * <h2>Data Persistence</h2>
+ * <h2>Scope and Purpose</h2>
+ *
+ * The purpose of this abstraction is used to expose a common and well defined interface for
+ * access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing
+ * state and recovery data) and by reusable built-in connectors (file sources / sinks).
+ *
+ * <p>The purpose of this abstraction is <b>not</b> to give user programs an abstraction with
+ * extreme flexibility and control across all possible file systems. That mission would be a folly,
+ * as the differences in characteristics of even the most common file systems are already quite
+ * large. It is expected that user programs that need specialized functionality of certain file systems
+ * in their functions, operations, sources, or sinks instantiate the specialized file system adapters
+ * directly.
+ *
+ * <h2>Data Persistence Contract</h2>
*
* The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data,
* both for results of streaming applications and for fault tolerance and recovery. It is therefore
@@ -152,6 +166,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* in between read or write operations, because there are no guarantees about the visibility of
* operations across threads (many operations do not create memory fences).
*
+ * <h2>Streams Safety Net</h2>
+ *
+ * When application code obtains a FileSystem (via {@link FileSystem#get(URI)} or via
+ * {@link Path#getFileSystem()}), the FileSystem instantiates a safety net for that FileSystem.
+ * The safety net ensures that all streams created from the FileSystem are closed when the
+ * application task finishes (or is canceled or failed). That way, the task's threads do not
+ * leak connections.
+ *
* @see FSDataInputStream
* @see FSDataOutputStream
*/
@@ -164,11 +186,13 @@ public abstract class FileSystem {
*/
public enum WriteMode {
- /** Creates the target file if it does not exist. Does not overwrite existing files and directories. */
+ /** Creates the target file only if no file exists at that path already.
+ * Does not overwrite existing files and directories. */
NO_OVERWRITE,
- /** Creates a new target file regardless of any existing files or directories. Existing files and
- * directories will be removed/overwritten. */
+ /** Creates a new target file regardless of any existing files or directories.
+ * Existing files and directories will be deleted (recursively) automatically before
+ * creating the new file. */
OVERWRITE
}
@@ -555,7 +579,6 @@ public abstract class FileSystem {
* source file
*/
public boolean exists(final Path f) throws IOException {
-
try {
return (getFileStatus(f) != null);
} catch (FileNotFoundException e) {
@@ -590,6 +613,11 @@ public abstract class FileSystem {
/**
* Opens an FSDataOutputStream at the indicated Path.
+ *
+ * <p>This method is deprecated, because most of its parameters are ignored by most file systems.
+ * To control for example the replication factor and block size in the Hadoop Distributed File system,
+ * make sure that the respective Hadoop configuration file is either linked from the Flink configuration,
+ * or in the classpath of either Flink or the user code.
*
* @param f
* the file name to open
@@ -602,8 +630,15 @@ public abstract class FileSystem {
* required block replication for the file.
* @param blockSize
* the size of the file blocks
- * @throws IOException
+ *
+ * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
+ * a file already exists at that path and the write mode indicates to not
+ * overwrite the file.
+ *
+ * @deprecated Deprecated because not well supported across types of file systems.
+ * Control the behavior of specific file systems via configurations instead.
*/
+ @Deprecated
public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
long blockSize) throws IOException;
@@ -615,9 +650,34 @@ public abstract class FileSystem {
* @param overwrite
* if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
- * @throws IOException
+ *
+ * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
+ * a file already exists at that path and the write mode indicates to not
+ * overwrite the file.
+ *
+ * @deprecated Use {@link #create(Path, WriteMode)} instead.
+ */
+ @Deprecated
+ public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+ return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE);
+ }
+
+ /**
+ * Opens an FSDataOutputStream to a new file at the given path.
+ *
+ * <p>If the file already exists, the behavior depends on the given {@code WriteMode}.
+ * If the mode is set to {@link WriteMode#NO_OVERWRITE}, then this method fails with an
+ * exception.
+ *
+ * @param f The file path to write to
+ * @param overwriteMode The action to take if a file or directory already exists at the given path.
+ * @return The stream to the new file at the target path.
+ *
+ * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
+ * a file already exists at that path and the write mode indicates to not
+ * overwrite the file.
*/
- public abstract FSDataOutputStream create(Path f, boolean overwrite) throws IOException;
+ public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;
/**
* Renames the file/directory src to dst.
@@ -632,7 +692,9 @@ public abstract class FileSystem {
public abstract boolean rename(Path src, Path dst) throws IOException;
/**
- * Returns true if this is a distributed file system, false otherwise.
+ * Returns true if this is a distributed file system. A distributed file system here means
+ * that the file system is shared among all Flink processes that participate in a cluster or
+ * job and that all these processes can see the same files.
*
* @return True, if this is a distributed file system, false otherwise.
*/
@@ -911,7 +973,7 @@ public abstract class FileSystem {
* An identifier of a file system, via its scheme and its authority.
* This class needs to stay public, because it is detected as part of the public API.
*/
- public static class FSKey {
+ private static final class FSKey {
/** The scheme of the file system. */
private final String scheme;
http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index 63e6253..1dacafd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -30,8 +30,8 @@ import java.net.URI;
* {@link ClosingFSDataInputStream} or {@link ClosingFSDataOutputStream} and (ii) registers them to
* a {@link SafetyNetCloseableRegistry}.
*
- * Streams obtained by this are therefore managed by the {@link SafetyNetCloseableRegistry} to prevent resource leaks
- * from unclosed streams.
+ * <p>Streams obtained by this are therefore managed by the {@link SafetyNetCloseableRegistry} to
+ * prevent resource leaks from unclosed streams.
*/
@Internal
public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingProxy<FileSystem> {
@@ -120,7 +120,7 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
}
@Override
- public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
+ public FSDataOutputStream create(Path f, WriteMode overwrite) throws IOException {
FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite);
return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index acbf814..12aeb7f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -43,9 +43,12 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
+import java.nio.file.FileAlreadyExistsException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * The class <code>LocalFile</code> provides an implementation of the {@link FileSystem} interface
+ * The class {@code LocalFileSystem} is an implementation of the {@link FileSystem} interface
* for the local file system of the machine where the JVM runs.
*/
@Internal
@@ -231,28 +234,27 @@ public class LocalFileSystem extends FileSystem {
return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
}
-
@Override
- public FSDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
- final short replication, final long blockSize) throws IOException {
+ public FSDataOutputStream create(final Path filePath, final WriteMode overwrite) throws IOException {
+ checkNotNull(filePath, "filePath");
- if (exists(f) && !overwrite) {
- throw new IOException("File already exists:" + f);
+ if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
+ throw new FileAlreadyExistsException("File already exists: " + filePath);
}
- final Path parent = f.getParent();
+ final Path parent = filePath.getParent();
if (parent != null && !mkdirs(parent)) {
- throw new IOException("Mkdirs failed to create " + parent.toString());
+ throw new IOException("Mkdirs failed to create " + parent);
}
- final File file = pathToFile(f);
+ final File file = pathToFile(filePath);
return new LocalDataOutputStream(file);
}
-
@Override
- public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
- return create(f, overwrite, 0, (short) 0, 0);
+ public FSDataOutputStream create(
+ Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
+ return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index e165d97..2b7a8c8 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -27,10 +27,10 @@ import java.util.Map;
/**
* This is the abstract base class for registries that allow to register instances of {@link Closeable}, which are all
* closed if this registry is closed.
- * <p>
- * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
- * <p>
- * All methods in this class are thread-safe.
+ *
+ * <p>Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
+ *
+ * <p>All methods in this class are thread-safe.
*
* @param <C> Type of the closeable this registers
* @param <T> Type for potential meta data associated with the registering closeables
@@ -51,7 +51,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
* {@link IllegalStateException} and closes the passed {@link Closeable}.
*
* @param closeable Closeable tor register
- * @return true if the the Closeable was newly added to the registry
+ *
* @throws IOException exception when the registry was closed before
*/
public final void registerClosable(C closeable) throws IOException {
@@ -74,7 +74,6 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
* Removes a {@link Closeable} from the registry.
*
* @param closeable instance to remove from the registry.
- * @return true, if the instance was actually registered and now removed
*/
public final void unregisterClosable(C closeable) {
@@ -109,6 +108,10 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
return closeableToRef;
}
+ // ------------------------------------------------------------------------
+ //
+ // ------------------------------------------------------------------------
+
protected abstract void doUnRegister(C closeable, Map<Closeable, T> closeableMap);
protected abstract void doRegister(C closeable, Map<Closeable, T> closeableMap) throws IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 36dfa55..1371d21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -417,9 +417,9 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
@Override
- public HadoopDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
+ public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
- .create(new org.apache.hadoop.fs.Path(f.toString()), overwrite);
+ .create(new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE);
return new HadoopDataOutputStream(fsDataOutputStream);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index a7ef441..57eea6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -327,11 +327,11 @@ public final class MapRFileSystem extends FileSystem {
}
@Override
- public FSDataOutputStream create(final Path f, final boolean overwrite)
+ public FSDataOutputStream create(final Path f, final WriteMode overwrite)
throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
- new org.apache.hadoop.fs.Path(f.toString()), overwrite);
+ new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE);
return new HadoopDataOutputStream(fdos);
}