You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/24 12:02:51 UTC
[1/3] flink git commit: [hotfix] [core] Fix lots of checkstyle errors
in core.fs
Repository: flink
Updated Branches:
refs/heads/release-1.4 3b58038d6 -> a11e2cf0b
[hotfix] [core] Fix lots of checkstyle errors in core.fs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5e156f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5e156f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5e156f7
Branch: refs/heads/release-1.4
Commit: b5e156f79ae7e9cd2f8d5008f0c350e10ad4a821
Parents: 3b58038
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 8 20:14:34 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 24 10:47:23 2017 +0100
----------------------------------------------------------------------
.../core/fs/AbstractMultiFSDataInputStream.java | 8 +-
.../org/apache/flink/core/fs/BlockLocation.java | 6 +-
.../apache/flink/core/fs/CloseableRegistry.java | 8 +-
.../flink/core/fs/ClosingFSDataInputStream.java | 4 +-
.../core/fs/ClosingFSDataOutputStream.java | 4 +-
.../apache/flink/core/fs/FSDataInputStream.java | 6 +-
.../flink/core/fs/FSDataInputStreamWrapper.java | 2 +-
.../flink/core/fs/FSDataOutputStream.java | 40 +++---
.../core/fs/FSDataOutputStreamWrapper.java | 2 +-
.../apache/flink/core/fs/FileInputSplit.java | 30 ++--
.../org/apache/flink/core/fs/FileStatus.java | 19 ++-
.../org/apache/flink/core/fs/FileSystem.java | 137 ++++++++++---------
.../apache/flink/core/fs/FileSystemFactory.java | 2 +-
.../java/org/apache/flink/core/fs/Path.java | 59 ++++----
.../flink/core/fs/UnsupportedSchemeFactory.java | 4 +-
.../flink/core/fs/WrappingProxyCloseable.java | 4 +-
16 files changed, 163 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
index a161ceb..e01ac2e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
@@ -32,13 +32,13 @@ import java.io.IOException;
@Internal
public abstract class AbstractMultiFSDataInputStream extends FSDataInputStream {
- /** Inner stream for the currently accessed segment of the virtual global stream */
+ /** Inner stream for the currently accessed segment of the virtual global stream. */
protected FSDataInputStream delegate;
- /** Position in the virtual global stream */
+ /** Position in the virtual global stream. */
protected long totalPos;
- /** Total available bytes in the virtual global stream */
+ /** Total available bytes in the virtual global stream. */
protected long totalAvailable;
public AbstractMultiFSDataInputStream() {
@@ -48,7 +48,7 @@ public abstract class AbstractMultiFSDataInputStream extends FSDataInputStream {
@Override
public void seek(long desired) throws IOException {
- if(desired == totalPos) {
+ if (desired == totalPos) {
return;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java b/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
index dff0c3e..fcdf905 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
@@ -30,7 +30,7 @@ public interface BlockLocation extends Comparable<BlockLocation> {
/**
* Get the list of hosts (hostname) hosting this block.
- *
+ *
* @return A list of hosts (hostname) hosting this block.
* @throws IOException
* thrown if the list of hosts could not be retrieved
@@ -39,14 +39,14 @@ public interface BlockLocation extends Comparable<BlockLocation> {
/**
* Get the start offset of the file associated with this block.
- *
+ *
* @return The start offset of the file associated with this block.
*/
long getOffset();
/**
* Get the length of the block.
- *
+ *
* @return the length of the block
*/
long getLength();
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 87d33d2..5f1c9fb 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -29,10 +29,10 @@ import java.util.Map;
/**
* This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed.
- * <p>
- * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
- * <p>
- * All methods in this class are thread-safe.
+ *
+ * <p>Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
+ *
+ * <p>All methods in this class are thread-safe.
*/
@Internal
public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> {
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
index 173a890..1a62f62 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
@@ -26,8 +26,8 @@ import java.io.IOException;
/**
* This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataInputStream} that is used to
* implement a safety net against unclosed streams.
- * <p>
- * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ *
+ * <p>See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
*/
@Internal
public class ClosingFSDataInputStream
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
index cb7de92..0f252a4 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
@@ -26,8 +26,8 @@ import java.io.IOException;
/**
* This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataOutputStream} that is used to
* implement a safety net against unclosed streams.
- * <p>
- * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ *
+ * <p>See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
*/
@Internal
public class ClosingFSDataOutputStream
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
index 44dbcb1..fa931c6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
@@ -25,7 +25,7 @@ import java.io.InputStream;
/**
* Interface for a data input stream to a file on a {@link FileSystem}.
- *
+ *
* <p>This extends the {@link java.io.InputStream} with methods for accessing
* the stream's {@link #getPos() current position} and
* {@link #seek(long) seeking} to a desired position.
@@ -36,7 +36,7 @@ public abstract class FSDataInputStream extends InputStream {
/**
* Seek to the given offset from the start of the file. The next read() will be from that location.
* Can't seek past the end of the file.
- *
+ *
* @param desired
* the desired offset
* @throws IOException Thrown if an error occurred while seeking inside the input stream.
@@ -47,7 +47,7 @@ public abstract class FSDataInputStream extends InputStream {
* Gets the current position in the input stream.
*
* @return current position in the input stream
- * @throws IOException Thrown if an I/O error occurred in the underlying stream
+ * @throws IOException Thrown if an I/O error occurred in the underlying stream
* implementation while accessing the stream's position.
*/
public abstract long getPos() throws IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
index d2eb9f2..6a3874f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
@@ -25,7 +25,7 @@ import org.apache.flink.util.WrappingProxy;
import java.io.IOException;
/**
- * Simple forwarding wrapper around {@link FSDataInputStream}
+ * Simple forwarding wrapper around {@link FSDataInputStream}.
*/
@Internal
public class FSDataInputStreamWrapper extends FSDataInputStream implements WrappingProxy<FSDataInputStream> {
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
index a8df5c1..4a6e01d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
@@ -26,22 +26,22 @@ import java.io.OutputStream;
/**
* An output stream to a file that is created via a {@link FileSystem}.
* This class extends the base {@link java.io.OutputStream} with some additional important methods.
- *
+ *
* <h2>Data Persistence Guarantees</h2>
- *
- * These streams are used to persistently store data, both for results of streaming applications
+ *
+ * <p>These streams are used to persistently store data, both for results of streaming applications
* and for fault tolerance and recovery. It is therefore crucial that the persistence semantics
* of these streams are well defined.
- *
+ *
* <p>Please refer to the class-level docs of {@link FileSystem} for the definition of data persistence
* via Flink's FileSystem abstraction and the {@code FSDataOutputStream}.
- *
+ *
* <h2>Thread Safety</h2>
- *
- * Implementations of the {@code FSDataOutputStream} are generally not assumed to be thread safe.
+ *
+ * <p>Implementations of the {@code FSDataOutputStream} are generally not assumed to be thread safe.
* Instances of {@code FSDataOutputStream} should not be passed between threads, because there
* are no guarantees about the order of visibility of operations across threads.
- *
+ *
* @see FileSystem
* @see FSDataInputStream
*/
@@ -52,13 +52,13 @@ public abstract class FSDataOutputStream extends OutputStream {
* Gets the position of the stream (non-negative), defined as the number of bytes
* from the beginning of the file to the current writing position. The position
* corresponds to the zero-based index of the next byte that will be written.
- *
+ *
* <p>This method must report accurately report the current position of the stream.
* Various components of the high-availability and recovery logic rely on the accurate
- *
+ *
* @return The current position in the stream, defined as the number of bytes
* from the beginning of the file to the current writing position.
- *
+ *
* @throws IOException Thrown if an I/O error occurs while obtaining the position from
* the stream implementation.
*/
@@ -68,14 +68,14 @@ public abstract class FSDataOutputStream extends OutputStream {
* Flushes the stream, writing any data currently buffered in stream implementation
* to the proper output stream. After this method has been called, the stream implementation
* must not hold onto any buffered data any more.
- *
+ *
* <p>A completed flush does not mean that the data is necessarily persistent. Data
* persistence can is only assumed after calls to {@link #close()} or {@link #sync()}.
- *
+ *
* <p>Implementation note: This overrides the method defined in {@link OutputStream}
* as abstract to force implementations of the {@code FSDataOutputStream} to implement
* this method directly.
- *
+ *
* @throws IOException Thrown if an I/O error occurs while flushing the stream.
*/
public abstract void flush() throws IOException;
@@ -84,9 +84,7 @@ public abstract class FSDataOutputStream extends OutputStream {
* Flushes the data all the way to the persistent non-volatile storage (for example disks).
* The method behaves similar to the <i>fsync</i> function, forcing all data to
* be persistent on the devices.
- *
- * <p>
- *
+ *
* @throws IOException Thrown if an I/O error occurs
*/
public abstract void sync() throws IOException;
@@ -95,20 +93,20 @@ public abstract class FSDataOutputStream extends OutputStream {
* Closes the output stream. After this method returns, the implementation must guarantee
* that all data written to the stream is persistent/visible, as defined in the
* {@link FileSystem class-level docs}.
- *
+ *
* <p>The above implies that the method must block until persistence can be guaranteed.
* For example for distributed replicated file systems, the method must block until the
* replication quorum has been reached. If the calling thread is interrupted in the
* process, it must fail with an {@code IOException} to indicate that persistence cannot
* be guaranteed.
- *
+ *
* <p>If this method throws an exception, the data in the stream cannot be assumed to be
* persistent.
- *
+ *
* <p>Implementation note: This overrides the method defined in {@link OutputStream}
* as abstract to force implementations of the {@code FSDataOutputStream} to implement
* this method directly.
- *
+ *
* @throws IOException Thrown, if an error occurred while closing the stream or guaranteeing
* that the data is persistent.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
index f015012..21e68ef 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
@@ -25,7 +25,7 @@ import org.apache.flink.util.WrappingProxy;
import java.io.IOException;
/**
- * Simple forwarding wrapper around {@link FSDataInputStream}
+ * Simple forwarding wrapper around {@link FSDataInputStream}.
*/
@Internal
public class FSDataOutputStreamWrapper extends FSDataOutputStream implements WrappingProxy<FSDataOutputStream> {
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
index 8af0a20..bef13fa 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.io.LocatableInputSplit;
/**
* A file input split provides information on a particular part of a file, possibly
- * hosted on a distributed file system and replicated among several hosts.
+ * hosted on a distributed file system and replicated among several hosts.
*/
@Public
public class FileInputSplit extends LocatableInputSplit {
@@ -34,16 +34,16 @@ public class FileInputSplit extends LocatableInputSplit {
private final Path file;
/** The position of the first byte in the file to process. */
- private long start;
+ private final long start;
/** The number of bytes in the file to process. */
- private long length;
+ private final long length;
// --------------------------------------------------------------------------------------------
-
+
/**
* Constructs a split with host information.
- *
+ *
* @param num
* the number of this input split
* @param file
@@ -57,17 +57,17 @@ public class FileInputSplit extends LocatableInputSplit {
*/
public FileInputSplit(int num, Path file, long start, long length, String[] hosts) {
super(num, hosts);
-
+
this.file = file;
this.start = start;
this.length = length;
}
// --------------------------------------------------------------------------------------------
-
+
/**
* Returns the path of the file containing this split's data.
- *
+ *
* @return the path of the file containing this split's data.
*/
public Path getPath() {
@@ -76,7 +76,7 @@ public class FileInputSplit extends LocatableInputSplit {
/**
* Returns the position of the first byte in the file to process.
- *
+ *
* @return the position of the first byte in the file to process
*/
public long getStart() {
@@ -85,20 +85,20 @@ public class FileInputSplit extends LocatableInputSplit {
/**
* Returns the number of bytes in the file to process.
- *
+ *
* @return the number of bytes in the file to process
*/
public long getLength() {
return length;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public int hashCode() {
return getSplitNumber() ^ (file == null ? 0 : file.hashCode());
}
-
+
@Override
public boolean equals(Object obj) {
if (obj == this) {
@@ -106,7 +106,7 @@ public class FileInputSplit extends LocatableInputSplit {
}
else if (obj != null && obj instanceof FileInputSplit && super.equals(obj)) {
FileInputSplit other = (FileInputSplit) obj;
-
+
return this.start == other.start &&
this.length == other.length &&
(this.file == null ? other.file == null : (other.file != null && this.file.equals(other.file)));
@@ -115,7 +115,7 @@ public class FileInputSplit extends LocatableInputSplit {
return false;
}
}
-
+
@Override
public String toString() {
return "[" + getSplitNumber() + "] " + file + ":" + start + "+" + length;
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java
index 8b62659..f9794e6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java
@@ -20,7 +20,7 @@
/**
* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership.
+ * additional information regarding copyright ownership.
*/
package org.apache.flink.core.fs;
@@ -30,56 +30,55 @@ import org.apache.flink.annotation.Public;
/**
* Interface that represents the client side information for a file
* independent of the file system.
- *
*/
@Public
public interface FileStatus {
/**
- * Return the length of this file
- *
+ * Return the length of this file.
+ *
* @return the length of this file
*/
long getLen();
/**
*Get the block size of the file.
- *
+ *
* @return the number of bytes
*/
long getBlockSize();
/**
* Get the replication factor of a file.
- *
+ *
* @return the replication factor of a file.
*/
short getReplication();
/**
* Get the modification time of the file.
- *
+ *
* @return the modification time of file in milliseconds since January 1, 1970 UTC.
*/
long getModificationTime();
/**
* Get the access time of the file.
- *
+ *
* @return the access time of file in milliseconds since January 1, 1970 UTC.
*/
long getAccessTime();
/**
* Checks if this object represents a directory.
- *
+ *
* @return <code>true</code> if this is a directory, <code>false</code> otherwise
*/
boolean isDir();
/**
* Returns the corresponding Path to the FileStatus.
- *
+ *
* @return the corresponding Path to the FileStatus
*/
Path getPath();
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 982e496..7a8245a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
/*
* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -38,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -55,40 +55,40 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* distributed file systems, or local file systems. The abstraction by this file system is very simple,
* and the set of available operations quite limited, to support the common denominator of a wide
* range of file systems. For example, appending to or mutating existing files is not supported.
- *
+ *
* <p>Flink implements and supports some file system types directly (for example the default
* machine-local file system). Other file system types are accessed by an implementation that bridges
* to the suite of file systems supported by Hadoop (such as for example HDFS).
- *
+ *
* <h2>Scope and Purpose</h2>
- *
- * The purpose of this abstraction is used to expose a common and well defined interface for
+ *
+ * <p>The purpose of this abstraction is used to expose a common and well defined interface for
* access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing
* state and recovery data) and by reusable built-in connectors (file sources / sinks).
- *
+ *
* <p>The purpose of this abstraction is <b>not</b> to give user programs an abstraction with
* extreme flexibility and control across all possible file systems. That mission would be a folly,
* as the differences in characteristics of even the most common file systems are already quite
* large. It is expected that user programs that need specialized functionality of certain file systems
* in their functions, operations, sources, or sinks instantiate the specialized file system adapters
* directly.
- *
+ *
* <h2>Data Persistence Contract</h2>
- *
- * The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data,
+ *
+ * <p>The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data,
* both for results of streaming applications and for fault tolerance and recovery. It is therefore
* crucial that the persistence semantics of these streams are well defined.
- *
+ *
* <h3>Definition of Persistence Guarantees</h3>
- *
- * Data written to an output stream is considered persistent, if two requirements are met:
- *
+ *
+ * <p>Data written to an output stream is considered persistent, if two requirements are met:
+ *
* <ol>
* <li><b>Visibility Requirement:</b> It must be guaranteed that all other processes, machines,
* virtual machines, containers, etc. that are able to access the file see the data consistently
* when given the absolute file path. This requirement is similar to the <i>close-to-open</i>
* semantics defined by POSIX, but restricted to the file itself (by its absolute path).</li>
- *
+ *
* <li><b>Durability Requirement:</b> The file system's specific durability/persistence requirements
* must be met. These are specific to the particular file system. For example the
* {@link LocalFileSystem} does not provide any durability guarantees for crashes of both
@@ -101,14 +101,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* listing the directory contents) are not required to be complete for the data in the file stream
* to be considered persistent. This relaxation is important for file systems where updates to
* directory contents are only eventually consistent.
- *
+ *
* <p>The {@link FSDataOutputStream} has to guarantee data persistence for the written bytes
* once the call to {@link FSDataOutputStream#close()} returns.
*
* <h3>Examples</h3>
*
* <ul>
- * <li>For <b>fault-tolerant distributed file systems</b>, data is considered persistent once
+ * <li>For <b>fault-tolerant distributed file systems</b>, data is considered persistent once
* it has been received and acknowledged by the file system, typically by having been replicated
* to a quorum of machines (<i>durability requirement</i>). In addition the absolute file path
* must be visible to all other machines that will potentially access the file (<i>visibility
@@ -125,12 +125,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <li>A <b>local file system</b> must support the POSIX <i>close-to-open</i> semantics.
* Because the local file system does not have any fault tolerance guarantees, no further
* requirements exist.
- *
+ *
* <p>The above implies specifically that data may still be in the OS cache when considered
* persistent from the local file system's perspective. Crashes that cause the OS cache to loose
* data are considered fatal to the local machine and are not covered by the local file system's
* guarantees as defined by Flink.
- *
+ *
* <p>That means that computed results, checkpoints, and savepoints that are written only to
* the local filesystem are not guaranteed to be recoverable from the local machine's failure,
* making local file systems unsuitable for production setups.</li>
@@ -138,14 +138,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <h2>Updating File Contents</h2>
*
- * Many file systems either do not support overwriting contents of existing files at all, or do
+ * <p>Many file systems either do not support overwriting contents of existing files at all, or do
* not support consistent visibility of the updated contents in that case. For that reason,
* Flink's FileSystem does not support appending to existing files, or seeking within output streams
* so that previously written data could be overwritten.
*
* <h2>Overwriting Files</h2>
*
- * Overwriting files is in general possible. A file is overwritten by deleting it and creating
+ * <p>Overwriting files is in general possible. A file is overwritten by deleting it and creating
* a new file. However, certain filesystems cannot make that change synchronously visible
* to all parties that have access to the file.
* For example <a href="https://aws.amazon.com/documentation/s3/">Amazon S3</a> guarantees only
@@ -154,29 +154,29 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>To avoid these consistency issues, the implementations of failure/recovery mechanisms in
* Flink strictly avoid writing to the same file path more than once.
- *
+ *
* <h2>Thread Safety</h2>
- *
- * Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem
+ *
+ * <p>Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem
* is frequently shared across multiple threads in Flink and must be able to concurrently
* create input/output streams and list file metadata.
- *
+ *
* <p>The {@link FSDataOutputStream} and {@link FSDataOutputStream} implementations are strictly
* <b>not thread-safe</b>. Instances of the streams should also not be passed between threads
* in between read or write operations, because there are no guarantees about the visibility of
* operations across threads (many operations do not create memory fences).
- *
+ *
* <h2>Streams Safety Net</h2>
- *
- * When application code obtains a FileSystem (via {@link FileSystem#get(URI)} or via
+ *
+ * <p>When application code obtains a FileSystem (via {@link FileSystem#get(URI)} or via
* {@link Path#getFileSystem()}), the FileSystem instantiates a safety net for that FileSystem.
* The safety net ensures that all streams created from the FileSystem are closed when the
* application task finishes (or is canceled or failed). That way, the task's threads do not
* leak connections.
- *
+ *
* <p>Internal runtime code can explicitly obtain a FileSystem that does not use the safety
* net via {@link FileSystem#getUnguardedFileSystem(URI)}.
- *
+ *
* @see FSDataInputStream
* @see FSDataOutputStream
*/
@@ -201,7 +201,7 @@ public abstract class FileSystem {
// ------------------------------------------------------------------------
- /** Logger for all FileSystem work */
+ /** Logger for all FileSystem work. */
private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
@@ -223,14 +223,13 @@ public abstract class FileSystem {
/** The default filesystem scheme to be used, configured during process-wide initialization.
* This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
private static URI DEFAULT_SCHEME;
-
// ------------------------------------------------------------------------
// Initialization
// ------------------------------------------------------------------------
/**
- * Initializes the shared file system settings.
+ * Initializes the shared file system settings.
*
* <p>The given configuration is passed to each file system factory to initialize the respective
* file systems. Because the configuration of file systems may be different subsequent to the call
@@ -351,7 +350,7 @@ public abstract class FileSystem {
}
}
- // print a helpful pointer for malformed local URIs (happens a lot to new users)
+ // print a helpful pointer for malformed local URIs (happens a lot to new users)
if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();
@@ -382,7 +381,7 @@ public abstract class FileSystem {
}
catch (UnsupportedFileSystemSchemeException e) {
throw new UnsupportedFileSystemSchemeException(
- "Could not find a file system implementation for scheme '" + uri.getScheme() +
+ "Could not find a file system implementation for scheme '" + uri.getScheme() +
"'. The scheme is not directly supported by Flink and no Hadoop file " +
"system to support this scheme could be loaded.", e);
}
@@ -479,7 +478,7 @@ public abstract class FileSystem {
* Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
*
* @return the number of bytes that large input files should be optimally be split into to minimize I/O time
- *
+ *
* @deprecated This value is no longer used and is meaningless.
*/
@Deprecated
@@ -539,7 +538,7 @@ public abstract class FileSystem {
/**
* Opens an FSDataOutputStream at the indicated Path.
- *
+ *
* <p>This method is deprecated, because most of its parameters are ignored by most file systems.
* To control for example the replication factor and block size in the Hadoop Distributed File system,
* make sure that the respective Hadoop configuration file is either linked from the Flink configuration,
@@ -556,13 +555,13 @@ public abstract class FileSystem {
* required block replication for the file.
* @param blockSize
* the size of the file blocks
- *
+ *
* @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
* a file already exists at that path and the write mode indicates to not
* overwrite the file.
- *
+ *
* @deprecated Deprecated because not well supported across types of file systems.
- * Control the behavior of specific file systems via configurations instead.
+ * Control the behavior of specific file systems via configurations instead.
*/
@Deprecated
public FSDataOutputStream create(
@@ -583,11 +582,11 @@ public abstract class FileSystem {
* @param overwrite
* if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
- *
+ *
* @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
* a file already exists at that path and the write mode indicates to not
* overwrite the file.
- *
+ *
* @deprecated Use {@link #create(Path, WriteMode)} instead.
*/
@Deprecated
@@ -597,7 +596,7 @@ public abstract class FileSystem {
/**
* Opens an FSDataOutputStream to a new file at the given path.
- *
+ *
* <p>If the file already exists, the behavior depends on the given {@code WriteMode}.
* If the mode is set to {@link WriteMode#NO_OVERWRITE}, then this method fails with an
* exception.
@@ -605,7 +604,7 @@ public abstract class FileSystem {
* @param f The file path to write to
* @param overwriteMode The action to take if a file or directory already exists at the given path.
* @return The stream to the new file at the target path.
- *
+ *
* @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
* a file already exists at that path and the write mode indicates to not
* overwrite the file.
@@ -674,7 +673,7 @@ public abstract class FileSystem {
* </ul>
* </li>
* </ul>
- *
+ *
* <p>Files contained in an existing directory are not deleted, because multiple instances of a
* DataSinkTask might call this function at the same time and hence might perform concurrent
* delete operations on the file system (possibly deleting output files of concurrently running tasks).
@@ -684,7 +683,7 @@ public abstract class FileSystem {
* @param outPath Output path that should be prepared.
* @param writeMode Write mode to consider.
* @param createDirectory True, to initialize a directory at the given path, false to prepare space for a file.
- *
+ *
* @return True, if the path was successfully prepared, false otherwise.
* @throws IOException Thrown, if any of the file system access operations failed.
*/
@@ -708,7 +707,7 @@ public abstract class FileSystem {
// restore the interruption state
Thread.currentThread().interrupt();
- // leave the method - we don't have the lock anyways
+ // leave the method - we don't have the lock anyways
throw new IOException("The thread was interrupted while trying to initialize the output directory");
}
@@ -733,7 +732,7 @@ public abstract class FileSystem {
} else {
// file may not be overwritten
throw new IOException("File or directory already exists. Existing files and directories " +
- "are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
+ "are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
}
@@ -748,7 +747,7 @@ public abstract class FileSystem {
delete(outPath, true);
}
catch (IOException e) {
- throw new IOException("Could not remove existing directory '" + outPath +
+ throw new IOException("Could not remove existing directory '" + outPath +
"' to allow overwrite by result file", e);
}
}
@@ -798,27 +797,27 @@ public abstract class FileSystem {
/**
* Initializes output directories on distributed file systems according to the given write mode.
*
- * WriteMode.NO_OVERWRITE & parallel output:
+ * <p>WriteMode.NO_OVERWRITE & parallel output:
* - A directory is created if the output path does not exist.
* - An existing file or directory raises an exception.
*
- * WriteMode.NO_OVERWRITE & NONE parallel output:
+ * <p>WriteMode.NO_OVERWRITE & NONE parallel output:
* - An existing file or directory raises an exception.
*
- * WriteMode.OVERWRITE & parallel output:
+ * <p>WriteMode.OVERWRITE & parallel output:
* - A directory is created if the output path does not exist.
* - An existing directory and its content is deleted and a new directory is created.
* - An existing file is deleted and replaced by a new directory.
*
- * WriteMode.OVERWRITE & NONE parallel output:
+ * <p>WriteMode.OVERWRITE & NONE parallel output:
* - An existing file or directory is deleted and replaced by a new directory.
*
* @param outPath Output path that should be prepared.
* @param writeMode Write mode to consider.
* @param createDirectory True, to initialize a directory at the given path, false otherwise.
- *
+ *
* @return True, if the path was successfully prepared, false otherwise.
- *
+ *
* @throws IOException Thrown, if any of the file system access operations failed.
*/
public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
@@ -841,7 +840,7 @@ public abstract class FileSystem {
// restore the interruption state
Thread.currentThread().interrupt();
- // leave the method - we don't have the lock anyways
+ // leave the method - we don't have the lock anyways
throw new IOException("The thread was interrupted while trying to initialize the output directory");
}
@@ -850,13 +849,13 @@ public abstract class FileSystem {
if (exists(outPath)) {
// path exists, check write mode
switch(writeMode) {
-
+
case NO_OVERWRITE:
// file or directory may not be overwritten
throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
" mode to overwrite existing files and directories.");
-
+
case OVERWRITE:
// output path exists. We delete it and all contained files in case of a directory.
try {
@@ -867,12 +866,12 @@ public abstract class FileSystem {
// this will be handled later.
}
break;
-
+
default:
- throw new IllegalArgumentException("Invalid write mode: "+writeMode);
+ throw new IllegalArgumentException("Invalid write mode: " + writeMode);
}
}
-
+
if (createDirectory) {
// Output directory needs to be created
try {
@@ -881,10 +880,10 @@ public abstract class FileSystem {
}
} catch (IOException ioe) {
// Some other thread might already have created the directory.
- // If - for some other reason - the directory could not be created
+ // If - for some other reason - the directory could not be created
// and the path does not exist, this will be handled later.
}
-
+
// double check that the output directory exists
return exists(outPath) && getFileStatus(outPath).isDir();
}
@@ -906,7 +905,7 @@ public abstract class FileSystem {
* Aside from the {@link LocalFileSystem}, these file systems are loaded
* via Java's service framework.
*
- * @return A map from the file system scheme to corresponding file system factory.
+ * @return A map from the file system scheme to corresponding file system factory.
*/
private static HashMap<String, FileSystemFactory> loadFileSystems() {
final HashMap<String, FileSystemFactory> map = new HashMap<>();
@@ -935,7 +934,7 @@ public abstract class FileSystem {
// catching Throwable here to handle various forms of class loading
// and initialization errors
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
- LOG.error("Failed to load a file systems via services", t);
+ LOG.error("Failed to load a file system via services", t);
}
}
}
@@ -948,7 +947,7 @@ public abstract class FileSystem {
return map;
}
-
+
/**
* Utility loader for the Hadoop file system factory.
* We treat the Hadoop FS factory in a special way, because we use it as a catch
@@ -963,10 +962,12 @@ public abstract class FileSystem {
// first, see if the Flink runtime classes are available
final Class<? extends FileSystemFactory> factoryClass;
try {
- factoryClass = Class.forName("org.apache.flink.runtime.fs.hdfs.HadoopFsFactory", false, cl).asSubclass(FileSystemFactory.class);
+ factoryClass = Class
+ .forName("org.apache.flink.runtime.fs.hdfs.HadoopFsFactory", false, cl)
+ .asSubclass(FileSystemFactory.class);
}
catch (ClassNotFoundException e) {
- LOG.info("No Flink runtime dependency present. " +
+ LOG.info("No Flink runtime dependency present. " +
"The extended set of supported File Systems via Hadoop is not available.");
return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies.");
}
@@ -1039,7 +1040,7 @@ public abstract class FileSystem {
@Override
public int hashCode() {
- return 31 * scheme.hashCode() +
+ return 31 * scheme.hashCode() +
(authority == null ? 17 : authority.hashCode());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
index 982da35..8a35471 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
@@ -58,4 +58,4 @@ public interface FileSystemFactory {
* @throws IOException Thrown if the file system could not be instantiated.
*/
FileSystem create(URI fsUri) throws IOException;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 53290ed..6398aa8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -22,26 +22,26 @@
package org.apache.flink.core.fs;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-
import org.apache.flink.annotation.Public;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.StringUtils;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+
/**
* Names a file or directory in a {@link FileSystem}. Path strings use slash as
* the directory separator. A path string is absolute if it begins with a slash.
*
- * Tailing slashes are removed from the path.
+ * <p>Tailing slashes are removed from the path.
*/
@Public
public class Path implements IOReadableWritable, Serializable {
-
+
private static final long serialVersionUID = 1L;
/**
@@ -71,7 +71,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Constructs a path object from a given URI.
- *
+ *
* @param uri
* the URI to construct the path object from
*/
@@ -81,7 +81,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Resolve a child path against a parent path.
- *
+ *
* @param parent
* the parent path
* @param child
@@ -93,7 +93,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Resolve a child path against a parent path.
- *
+ *
* @param parent
* the parent path
* @param child
@@ -105,7 +105,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Resolve a child path against a parent path.
- *
+ *
* @param parent
* the parent path
* @param child
@@ -117,7 +117,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Resolve a child path against a parent path.
- *
+ *
* @param parent
* the parent path
* @param child
@@ -168,7 +168,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Construct a path from a String. Path strings are URIs, but with unescaped
* elements and some additional normalization.
- *
+ *
* @param pathString
* the string to construct a path from
*/
@@ -214,7 +214,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Construct a Path from a scheme, an authority and a path string.
- *
+ *
* @param scheme
* the scheme string
* @param authority
@@ -229,7 +229,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Initializes a path object given the scheme, authority and path string.
- *
+ *
* @param scheme
* the scheme string.
* @param authority
@@ -247,7 +247,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Normalizes a path string.
- *
+ *
* @param path
* the path string to normalize
* @return the normalized path string
@@ -262,10 +262,10 @@ public class Path implements IOReadableWritable, Serializable {
path = path.replaceAll("/+", "/");
// remove tailing separator
- if(!path.equals(SEPARATOR) && // UNIX root path
+ if (!path.equals(SEPARATOR) && // UNIX root path
!path.matches("/\\p{Alpha}+:/") && // Windows root path
- path.endsWith(SEPARATOR))
- {
+ path.endsWith(SEPARATOR)) {
+
// remove tailing slash
path = path.substring(0, path.length() - SEPARATOR.length());
}
@@ -275,7 +275,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Converts the path object to a {@link URI}.
- *
+ *
* @return the {@link URI} object converted from the path object
*/
public URI toUri() {
@@ -284,7 +284,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Returns the FileSystem that owns this Path.
- *
+ *
* @return the FileSystem that owns this Path
* @throws IOException
* thrown if the file system could not be retrieved
@@ -295,7 +295,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Checks if the directory of this path is absolute.
- *
+ *
* @return <code>true</code> if the directory of this path is absolute, <code>false</code> otherwise
*/
public boolean isAbsolute() {
@@ -305,7 +305,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Returns the final component of this path, i.e., everything that follows the last separator.
- *
+ *
* @return the final component of the path
*/
public String getName() {
@@ -325,7 +325,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Returns the parent of a path, i.e., everything that precedes the last separator
* or <code>null</code> if at root.
- *
+ *
* @return the parent of a path or <code>null</code> if at root.
*/
public Path getParent() {
@@ -348,7 +348,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Adds a suffix to the final name in the path.
- *
+ *
* @param suffix The suffix to be added
* @return the new path including the suffix
*/
@@ -381,7 +381,6 @@ public class Path implements IOReadableWritable, Serializable {
return buffer.toString();
}
-
@Override
public boolean equals(Object o) {
if (!(o instanceof Path)) {
@@ -391,7 +390,6 @@ public class Path implements IOReadableWritable, Serializable {
return this.uri.equals(that.uri);
}
-
@Override
public int hashCode() {
return uri.hashCode();
@@ -404,7 +402,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Returns the number of elements in this path.
- *
+ *
* @return the number of elements in this path
*/
public int depth() {
@@ -420,7 +418,7 @@ public class Path implements IOReadableWritable, Serializable {
/**
* Returns a qualified path object.
- *
+ *
* @param fs
* the FileSystem that should be used to obtain the current working directory
* @return the qualified path object
@@ -479,7 +477,6 @@ public class Path implements IOReadableWritable, Serializable {
}
}
-
@Override
public void write(DataOutputView out) throws IOException {
if (uri == null) {
@@ -516,7 +513,7 @@ public class Path implements IOReadableWritable, Serializable {
* the path to check
* @param slashed
* true to indicate the first character of the string is a slash, false otherwise
- *
+ *
* @return <code>true</code> if the path string contains a windows drive letter, false otherwise
*/
private boolean hasWindowsDrive(String path, boolean slashed) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
index 234b49f..c2cb2d5 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
@@ -20,11 +20,9 @@ package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.net.URI;
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
index 9f100ef..e1c5a7d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
@@ -27,6 +27,4 @@ import java.io.Closeable;
* {@link WrappingProxy} for {@link Closeable} that is also closeable.
*/
@Internal
-public interface WrappingProxyCloseable<T extends Closeable> extends Closeable, WrappingProxy<T> {
-
-}
+public interface WrappingProxyCloseable<T extends Closeable> extends Closeable, WrappingProxy<T> {}
[2/3] flink git commit: [FLINK-8125] [core] Introduce limiting of
outgoing file system connections
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
new file mode 100644
index 0000000..509b4ae
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link LimitedConnectionsFileSystem}.
+ */
+public class LimitedConnectionsFileSystemTest {
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testConstructionNumericOverflow() {
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE, // unlimited total
+ Integer.MAX_VALUE, // limited outgoing
+ Integer.MAX_VALUE, // unlimited incoming
+ Long.MAX_VALUE - 1, // long timeout, close to overflow
+ Long.MAX_VALUE - 1); // long timeout, close to overflow
+
+ assertEquals(Integer.MAX_VALUE, limitedFs.getMaxNumOpenStreamsTotal());
+ assertEquals(Integer.MAX_VALUE, limitedFs.getMaxNumOpenOutputStreams());
+ assertEquals(Integer.MAX_VALUE, limitedFs.getMaxNumOpenInputStreams());
+
+ assertTrue(limitedFs.getStreamOpenTimeout() > 0);
+ assertTrue(limitedFs.getStreamInactivityTimeout() > 0);
+ }
+
+ @Test
+ public void testLimitingOutputStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 61;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE, // unlimited total
+ maxConcurrentOpen, // limited outgoing
+ Integer.MAX_VALUE, // unlimited incoming
+ 0,
+ 0);
+
+ final WriterThread[] threads = new WriterThread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ Path path = new Path(tempFolder.newFile().toURI());
+ threads[i] = new WriterThread(limitedFs, path, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+
+ for (WriterThread t : threads) {
+ t.start();
+ }
+
+ for (WriterThread t : threads) {
+ t.sync();
+ }
+ }
+
+ @Test
+ public void testLimitingInputStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 61;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE, // unlimited total
+ Integer.MAX_VALUE, // unlimited outgoing
+ maxConcurrentOpen, // limited incoming
+ 0,
+ 0);
+
+ final Random rnd = new Random();
+
+ final ReaderThread[] threads = new ReaderThread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ File file = tempFolder.newFile();
+ createRandomContents(file, rnd);
+ Path path = new Path(file.toURI());
+ threads[i] = new ReaderThread(limitedFs, path, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+
+ for (ReaderThread t : threads) {
+ t.start();
+ }
+
+ for (ReaderThread t : threads) {
+ t.sync();
+ }
+ }
+
+ @Test
+ public void testLimitingMixedStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 61;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ maxConcurrentOpen); // limited total
+
+ final Random rnd = new Random();
+
+ final CheckedThread[] threads = new CheckedThread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ File file = tempFolder.newFile();
+ Path path = new Path(file.toURI());
+
+ if (rnd.nextBoolean()) {
+ // reader thread
+ createRandomContents(file, rnd);
+ threads[i] = new ReaderThread(limitedFs, path, Integer.MAX_VALUE, maxConcurrentOpen);
+ }
+ else {
+ threads[i] = new WriterThread(limitedFs, path, Integer.MAX_VALUE, maxConcurrentOpen);
+ }
+ }
+
+ for (CheckedThread t : threads) {
+ t.start();
+ }
+
+ for (CheckedThread t : threads) {
+ t.sync();
+ }
+ }
+
+ @Test
+ public void testOpenTimeoutOutputStreams() throws Exception {
+ final long openTimeout = 50L;
+ final int maxConcurrentOpen = 2;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ maxConcurrentOpen, // limited total
+ openTimeout, // small opening timeout
+ 0L); // infinite inactivity timeout
+
+ // create the threads that block all streams
+ final BlockingWriterThread[] threads = new BlockingWriterThread[maxConcurrentOpen];
+ for (int i = 0; i < maxConcurrentOpen; i++) {
+ Path path = new Path(tempFolder.newFile().toURI());
+ threads[i] = new BlockingWriterThread(limitedFs, path, Integer.MAX_VALUE, maxConcurrentOpen);
+ threads[i].start();
+ }
+
+ // wait until all are open
+ while (limitedFs.getTotalNumberOfOpenStreams() < maxConcurrentOpen) {
+ Thread.sleep(1);
+ }
+
+ // try to open another thread
+ try {
+ limitedFs.create(new Path(tempFolder.newFile().toURI()), WriteMode.OVERWRITE);
+ fail("this should have timed out");
+ }
+ catch (IOException e) {
+ // expected
+ }
+
+ // clean shutdown
+ for (BlockingWriterThread t : threads) {
+ t.wakeup();
+ t.sync();
+ }
+ }
+
+ @Test
+ public void testOpenTimeoutInputStreams() throws Exception {
+ final long openTimeout = 50L;
+ final int maxConcurrentOpen = 2;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ maxConcurrentOpen, // limited total
+ openTimeout, // small opening timeout
+ 0L); // infinite inactivity timeout
+
+ // create the threads that block all streams
+ final Random rnd = new Random();
+ final BlockingReaderThread[] threads = new BlockingReaderThread[maxConcurrentOpen];
+ for (int i = 0; i < maxConcurrentOpen; i++) {
+ File file = tempFolder.newFile();
+ createRandomContents(file, rnd);
+ Path path = new Path(file.toURI());
+ threads[i] = new BlockingReaderThread(limitedFs, path, maxConcurrentOpen, Integer.MAX_VALUE);
+ threads[i].start();
+ }
+
+ // wait until all are open
+ while (limitedFs.getTotalNumberOfOpenStreams() < maxConcurrentOpen) {
+ Thread.sleep(1);
+ }
+
+ // try to open another thread
+ File file = tempFolder.newFile();
+ createRandomContents(file, rnd);
+ try {
+ limitedFs.open(new Path(file.toURI()));
+ fail("this should have timed out");
+ }
+ catch (IOException e) {
+ // expected
+ }
+
+ // clean shutdown
+ for (BlockingReaderThread t : threads) {
+ t.wakeup();
+ t.sync();
+ }
+ }
+
+ @Test
+ public void testTerminateStalledOutputStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 20;
+
+ // this testing file system has a 50 ms stream inactivity timeout
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE, // no limit on total streams
+ maxConcurrentOpen, // limit on output streams
+ Integer.MAX_VALUE, // no limit on input streams
+ 0,
+ 50); // timeout of 50 ms
+
+ final WriterThread[] threads = new WriterThread[numThreads];
+ final BlockingWriterThread[] blockers = new BlockingWriterThread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ Path path1 = new Path(tempFolder.newFile().toURI());
+ Path path2 = new Path(tempFolder.newFile().toURI());
+
+ threads[i] = new WriterThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+ blockers[i] = new BlockingWriterThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+
+ // start normal and blocker threads
+ for (int i = 0; i < numThreads; i++) {
+ blockers[i].start();
+ threads[i].start();
+ }
+
+ // all normal threads need to be able to finish because
+ // the blockers eventually time out
+ for (WriterThread t : threads) {
+ try {
+ t.sync();
+ } catch (LimitedConnectionsFileSystem.StreamTimeoutException e) {
+ // also the regular threads may occasionally get a timeout on
+ // slower test machines because we set very aggressive timeouts
+ // to reduce the test time
+ }
+ }
+
+ // unblock all the blocking threads
+ for (BlockingThread t : blockers) {
+ t.wakeup();
+ }
+ for (BlockingThread t : blockers) {
+ try {
+ t.sync();
+ }
+ catch (LimitedConnectionsFileSystem.StreamTimeoutException ignored) {}
+ }
+ }
+
+ @Test
+ public void testTerminateStalledInputStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 20;
+
+ // this testing file system has a 50 ms stream inactivity timeout
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ Integer.MAX_VALUE, // no limit on total streams
+ Integer.MAX_VALUE, // limit on output streams
+ maxConcurrentOpen, // no limit on input streams
+ 0,
+ 50); // timeout of 50 ms
+
+ final Random rnd = new Random();
+
+ final ReaderThread[] threads = new ReaderThread[numThreads];
+ final BlockingReaderThread[] blockers = new BlockingReaderThread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ File file1 = tempFolder.newFile();
+ File file2 = tempFolder.newFile();
+
+ createRandomContents(file1, rnd);
+ createRandomContents(file2, rnd);
+
+ Path path1 = new Path(file1.toURI());
+ Path path2 = new Path(file2.toURI());
+
+ threads[i] = new ReaderThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+ blockers[i] = new BlockingReaderThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+
+ // start normal and blocker threads
+ for (int i = 0; i < numThreads; i++) {
+ blockers[i].start();
+ threads[i].start();
+ }
+
+ // all normal threads need to be able to finish because
+ // the blockers eventually time out
+ for (ReaderThread t : threads) {
+ try {
+ t.sync();
+ } catch (LimitedConnectionsFileSystem.StreamTimeoutException e) {
+ // also the regular threads may occasionally get a timeout on
+ // slower test machines because we set very aggressive timeouts
+ // to reduce the test time
+ }
+ }
+
+ // unblock all the blocking threads
+ for (BlockingThread t : blockers) {
+ t.wakeup();
+ }
+ for (BlockingThread t : blockers) {
+ try {
+ t.sync();
+ }
+ catch (LimitedConnectionsFileSystem.StreamTimeoutException ignored) {}
+ }
+ }
+
+ @Test
+ public void testTerminateStalledMixedStreams() throws Exception {
+ final int maxConcurrentOpen = 2;
+ final int numThreads = 20;
+
+ final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(),
+ maxConcurrentOpen, // limited total
+ 0L, // no opening timeout
+ 50L); // inactivity timeout of 50 ms
+
+ final Random rnd = new Random();
+
+ final CheckedThread[] threads = new CheckedThread[numThreads];
+ final BlockingThread[] blockers = new BlockingThread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ File file1 = tempFolder.newFile();
+ File file2 = tempFolder.newFile();
+ Path path1 = new Path(file1.toURI());
+ Path path2 = new Path(file2.toURI());
+
+ if (rnd.nextBoolean()) {
+ createRandomContents(file1, rnd);
+ createRandomContents(file2, rnd);
+ threads[i] = new ReaderThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+ blockers[i] = new BlockingReaderThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+ else {
+ threads[i] = new WriterThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+ blockers[i] = new BlockingWriterThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+ }
+ }
+
+ // start normal and blocker threads
+ for (int i = 0; i < numThreads; i++) {
+ blockers[i].start();
+ threads[i].start();
+ }
+
+ // all normal threads need to be able to finish because
+ // the blockers eventually time out
+ for (CheckedThread t : threads) {
+ try {
+ t.sync();
+ } catch (LimitedConnectionsFileSystem.StreamTimeoutException e) {
+ // also the regular threads may occasionally get a timeout on
+ // slower test machines because we set very aggressive timeouts
+ // to reduce the test time
+ }
+ }
+
+ // unblock all the blocking threads
+ for (BlockingThread t : blockers) {
+ t.wakeup();
+ }
+ for (BlockingThread t : blockers) {
+ try {
+ t.sync();
+ }
+ catch (LimitedConnectionsFileSystem.StreamTimeoutException ignored) {}
+ }
+ }
+
+ @Test
+ public void testFailingStreamsUnregister() throws Exception {
+ final LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem(new FailFs(), 1);
+
+ assertEquals(0, fs.getNumberOfOpenInputStreams());
+ assertEquals(0, fs.getNumberOfOpenOutputStreams());
+ assertEquals(0, fs.getTotalNumberOfOpenStreams());
+
+ try {
+ fs.open(new Path(tempFolder.newFile().toURI()));
+ fail("this is expected to fail with an exception");
+ } catch (IOException e) {
+ // expected
+ }
+
+ try {
+ fs.create(new Path(tempFolder.newFile().toURI()), WriteMode.NO_OVERWRITE);
+ fail("this is expected to fail with an exception");
+ } catch (IOException e) {
+ // expected
+ }
+
+ assertEquals(0, fs.getNumberOfOpenInputStreams());
+ assertEquals(0, fs.getNumberOfOpenOutputStreams());
+ assertEquals(0, fs.getTotalNumberOfOpenStreams());
+ }
+
+ /**
+ * Tests that a slowly written output stream is not accidentally closed too aggressively, due to
+ * a wrong initialization of the timestamps or bytes written that mark when the last progress was checked.
+ */
+ @Test
+ public void testSlowOutputStreamNotClosed() throws Exception {
+ final LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(), 1, 0L, 1000L);
+
+ // some competing threads
+ final Random rnd = new Random();
+ final ReaderThread[] threads = new ReaderThread[10];
+ for (int i = 0; i < threads.length; i++) {
+ File file = tempFolder.newFile();
+ createRandomContents(file, rnd);
+ Path path = new Path(file.toURI());
+ threads[i] = new ReaderThread(fs, path, 1, Integer.MAX_VALUE);
+ }
+
+ // open the stream we test
+ try (FSDataOutputStream out = fs.create(new Path(tempFolder.newFile().toURI()), WriteMode.OVERWRITE)) {
+
+ // start the other threads that will try to shoot this stream down
+ for (ReaderThread t : threads) {
+ t.start();
+ }
+
+ // read the stream slowly.
+ Thread.sleep(5);
+ for (int bytesLeft = 50; bytesLeft > 0; bytesLeft--) {
+ out.write(bytesLeft);
+ Thread.sleep(5);
+ }
+ }
+
+ // wait for clean shutdown
+ for (ReaderThread t : threads) {
+ t.sync();
+ }
+ }
+
+ /**
+ * Tests that a slowly read stream is not accidentally closed too aggressively, due to
+ * a wrong initialization of the timestamps or bytes written that mark when the last progress was checked.
+ */
+ @Test
+ public void testSlowInputStreamNotClosed() throws Exception {
+ final File file = tempFolder.newFile();
+ createRandomContents(file, new Random(), 50);
+
+ final LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem(
+ LocalFileSystem.getSharedInstance(), 1, 0L, 1000L);
+
+ // some competing threads
+ final WriterThread[] threads = new WriterThread[10];
+ for (int i = 0; i < threads.length; i++) {
+ Path path = new Path(tempFolder.newFile().toURI());
+ threads[i] = new WriterThread(fs, path, 1, Integer.MAX_VALUE);
+ }
+
+ // open the stream we test
+ try (FSDataInputStream in = fs.open(new Path(file.toURI()))) {
+
+ // start the other threads that will try to shoot this stream down
+ for (WriterThread t : threads) {
+ t.start();
+ }
+
+ // read the stream slowly.
+ Thread.sleep(5);
+ while (in.read() != -1) {
+ Thread.sleep(5);
+ }
+ }
+
+ // wait for clean shutdown
+ for (WriterThread t : threads) {
+ t.sync();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utils
+ // ------------------------------------------------------------------------
+
+ private void createRandomContents(File file, Random rnd) throws IOException {
+ createRandomContents(file, rnd, rnd.nextInt(10000) + 1);
+ }
+
+ private void createRandomContents(File file, Random rnd, int size) throws IOException {
+ final byte[] data = new byte[size];
+ rnd.nextBytes(data);
+
+ try (FileOutputStream fos = new FileOutputStream(file)) {
+ fos.write(data);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Testing threads
+ // ------------------------------------------------------------------------
+
+ private static final class WriterThread extends CheckedThread {
+
+ private final LimitedConnectionsFileSystem fs;
+
+ private final Path path;
+
+ private final int maxConcurrentOutputStreams;
+
+ private final int maxConcurrentStreamsTotal;
+
+ WriterThread(
+ LimitedConnectionsFileSystem fs,
+ Path path,
+ int maxConcurrentOutputStreams,
+ int maxConcurrentStreamsTotal) {
+
+ this.fs = fs;
+ this.path = path;
+ this.maxConcurrentOutputStreams = maxConcurrentOutputStreams;
+ this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+ }
+
+ @Override
+ public void go() throws Exception {
+
+ try (FSDataOutputStream stream = fs.create(path, WriteMode.OVERWRITE)) {
+ assertTrue(fs.getNumberOfOpenOutputStreams() <= maxConcurrentOutputStreams);
+ assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+ final Random rnd = new Random();
+ final byte[] data = new byte[rnd.nextInt(10000) + 1];
+ rnd.nextBytes(data);
+ stream.write(data);
+ }
+ }
+ }
+
+ private static final class ReaderThread extends CheckedThread {
+
+ private final LimitedConnectionsFileSystem fs;
+
+ private final Path path;
+
+ private final int maxConcurrentInputStreams;
+
+ private final int maxConcurrentStreamsTotal;
+
+ ReaderThread(
+ LimitedConnectionsFileSystem fs,
+ Path path,
+ int maxConcurrentInputStreams,
+ int maxConcurrentStreamsTotal) {
+
+ this.fs = fs;
+ this.path = path;
+ this.maxConcurrentInputStreams = maxConcurrentInputStreams;
+ this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+ }
+
+ @Override
+ public void go() throws Exception {
+
+ try (FSDataInputStream stream = fs.open(path)) {
+ assertTrue(fs.getNumberOfOpenInputStreams() <= maxConcurrentInputStreams);
+ assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+ final byte[] readBuffer = new byte[4096];
+
+ //noinspection StatementWithEmptyBody
+ while (stream.read(readBuffer) != -1) {}
+ }
+ }
+ }
+
+ private static abstract class BlockingThread extends CheckedThread {
+
+ private final OneShotLatch waiter = new OneShotLatch();
+
+ public void waitTillWokenUp() throws InterruptedException {
+ waiter.await();
+ }
+
+ public void wakeup() {
+ waiter.trigger();
+ }
+ }
+
+ private static final class BlockingWriterThread extends BlockingThread {
+
+ private final LimitedConnectionsFileSystem fs;
+
+ private final Path path;
+
+ private final int maxConcurrentOutputStreams;
+
+ private final int maxConcurrentStreamsTotal;
+
+ BlockingWriterThread(
+ LimitedConnectionsFileSystem fs,
+ Path path,
+ int maxConcurrentOutputStreams,
+ int maxConcurrentStreamsTotal) {
+
+ this.fs = fs;
+ this.path = path;
+ this.maxConcurrentOutputStreams = maxConcurrentOutputStreams;
+ this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+ }
+
+ @Override
+ public void go() throws Exception {
+
+ try (FSDataOutputStream stream = fs.create(path, WriteMode.OVERWRITE)) {
+ assertTrue(fs.getNumberOfOpenOutputStreams() <= maxConcurrentOutputStreams);
+ assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+ final Random rnd = new Random();
+ final byte[] data = new byte[rnd.nextInt(10000) + 1];
+ rnd.nextBytes(data);
+ stream.write(data);
+
+ waitTillWokenUp();
+
+ // try to write one more thing, which might/should fail with an I/O exception
+ stream.write(rnd.nextInt());
+ }
+ }
+ }
+
+ private static final class BlockingReaderThread extends BlockingThread {
+
+ private final LimitedConnectionsFileSystem fs;
+
+ private final Path path;
+
+ private final int maxConcurrentInputStreams;
+
+ private final int maxConcurrentStreamsTotal;
+
+ BlockingReaderThread(
+ LimitedConnectionsFileSystem fs,
+ Path path,
+ int maxConcurrentInputStreams,
+ int maxConcurrentStreamsTotal) {
+
+ this.fs = fs;
+ this.path = path;
+ this.maxConcurrentInputStreams = maxConcurrentInputStreams;
+ this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+ }
+
+ @Override
+ public void go() throws Exception {
+
+ try (FSDataInputStream stream = fs.open(path)) {
+ assertTrue(fs.getNumberOfOpenInputStreams() <= maxConcurrentInputStreams);
+ assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+ final byte[] readBuffer = new byte[(int) fs.getFileStatus(path).getLen() - 1];
+ assertTrue(stream.read(readBuffer) != -1);
+
+ waitTillWokenUp();
+
+ // try to write one more thing, which might/should fail with an I/O exception
+ //noinspection ResultOfMethodCallIgnored
+ stream.read();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // failing file system
+ // ------------------------------------------------------------------------
+
+ private static class FailFs extends LocalFileSystem {
+
+ @Override
+ public FSDataOutputStream create(Path filePath, WriteMode overwrite) throws IOException {
+ throw new IOException("test exception");
+ }
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ throw new IOException("test exception");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
index 50e64e1..2444c65 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
@@ -19,7 +19,10 @@
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem.ConnectionLimitingSettings;
import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
import org.apache.flink.runtime.util.HadoopUtils;
@@ -63,7 +66,7 @@ public class HadoopFsFactory implements FileSystemFactory {
}
@Override
- public HadoopFileSystem create(URI fsUri) throws IOException {
+ public FileSystem create(URI fsUri) throws IOException {
checkNotNull(fsUri, "fsUri");
final String scheme = fsUri.getScheme();
@@ -162,8 +165,15 @@ public class HadoopFsFactory implements FileSystemFactory {
throw new IOException(message, e);
}
- // all good, return the file system
- return new HadoopFileSystem(hadoopFs);
+ HadoopFileSystem fs = new HadoopFileSystem(hadoopFs);
+
+ // create the Flink file system, optionally limiting the open connections
+ if (flinkConfig != null) {
+ return limitIfConfigured(fs, scheme, flinkConfig);
+ }
+ else {
+ return fs;
+ }
}
catch (ReflectiveOperationException | LinkageError e) {
throw new UnsupportedFileSystemSchemeException("Cannot support file system for '" + fsUri.getScheme() +
@@ -183,4 +193,23 @@ public class HadoopFsFactory implements FileSystemFactory {
"(like for example HDFS NameNode address/port or S3 host). " +
"The attempt to use a configured default authority failed: ";
}
+
+ private static FileSystem limitIfConfigured(HadoopFileSystem fs, String scheme, Configuration config) {
+ final ConnectionLimitingSettings limitSettings = ConnectionLimitingSettings.fromConfig(config, scheme);
+
+ // decorate only if any limit is configured
+ if (limitSettings == null) {
+ // no limit configured
+ return fs;
+ }
+ else {
+ return new LimitedConnectionsFileSystem(
+ fs,
+ limitSettings.limitTotal,
+ limitSettings.limitOutput,
+ limitSettings.limitInput,
+ limitSettings.streamOpenTimeout,
+ limitSettings.streamInactivityTimeout);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
index 1f5c932..4b7592d 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.fs.hdfs;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -39,7 +40,7 @@ public class HadoopFsFactoryTest extends TestLogger {
final URI uri = URI.create("hdfs://localhost:12345/");
HadoopFsFactory factory = new HadoopFsFactory();
- HadoopFileSystem fs = factory.create(uri);
+ FileSystem fs = factory.create(uri);
assertEquals(uri.getScheme(), fs.getUri().getScheme());
assertEquals(uri.getAuthority(), fs.getUri().getAuthority());
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
new file mode 100644
index 0000000..8ab5419
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test that the Hadoop file system wrapper correctly picks up connection limiting
+ * settings for the correct file systems.
+ */
+public class LimitedConnectionsConfigurationTest {
+
+ @Rule
+ public final TemporaryFolder tempDir = new TemporaryFolder();
+
+ @Test
+ public void testConfiguration() throws Exception {
+
+ // nothing configured, we should get a regular file system
+ FileSystem hdfs = FileSystem.get(URI.create("hdfs://localhost:12345/a/b/c"));
+ FileSystem ftpfs = FileSystem.get(URI.create("ftp://localhost:12345/a/b/c"));
+
+ assertFalse(hdfs instanceof LimitedConnectionsFileSystem);
+ assertFalse(ftpfs instanceof LimitedConnectionsFileSystem);
+
+ // configure some limits, which should cause "fsScheme" to be limited
+
+ final Configuration config = new Configuration();
+ config.setInteger("fs.hdfs.limit.total", 40);
+ config.setInteger("fs.hdfs.limit.input", 39);
+ config.setInteger("fs.hdfs.limit.output", 38);
+ config.setInteger("fs.hdfs.limit.timeout", 23456);
+ config.setInteger("fs.hdfs.limit.stream-timeout", 34567);
+
+ try {
+ FileSystem.initialize(config);
+
+ hdfs = FileSystem.get(URI.create("hdfs://localhost:12345/a/b/c"));
+ ftpfs = FileSystem.get(URI.create("ftp://localhost:12345/a/b/c"));
+
+ assertTrue(hdfs instanceof LimitedConnectionsFileSystem);
+ assertFalse(ftpfs instanceof LimitedConnectionsFileSystem);
+
+ LimitedConnectionsFileSystem limitedFs = (LimitedConnectionsFileSystem) hdfs;
+ assertEquals(40, limitedFs.getMaxNumOpenStreamsTotal());
+ assertEquals(39, limitedFs.getMaxNumOpenInputStreams());
+ assertEquals(38, limitedFs.getMaxNumOpenOutputStreams());
+ assertEquals(23456, limitedFs.getStreamOpenTimeout());
+ assertEquals(34567, limitedFs.getStreamInactivityTimeout());
+ }
+ finally {
+ // clear all settings
+ FileSystem.initialize(new Configuration());
+ }
+ }
+}
[3/3] flink git commit: [FLINK-8125] [core] Introduce limiting of
outgoing file system connections
Posted by se...@apache.org.
[FLINK-8125] [core] Introduce limiting of outgoing file system connections
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a11e2cf0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a11e2cf0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a11e2cf0
Branch: refs/heads/release-1.4
Commit: a11e2cf0b1f37d3ef22e1978e89928fa374960db
Parents: b5e156f
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 8 23:57:04 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 24 10:47:33 2017 +0100
----------------------------------------------------------------------
docs/ops/filesystems.md | 126 ++
.../flink/configuration/ConfigConstants.java | 5 +-
.../apache/flink/configuration/CoreOptions.java | 57 +
.../core/fs/ConnectionLimitingFactory.java | 95 ++
.../org/apache/flink/core/fs/FileSystem.java | 46 +-
.../core/fs/LimitedConnectionsFileSystem.java | 1114 ++++++++++++++++++
.../core/fs/local/LocalFileSystemFactory.java | 1 -
.../util/function/SupplierWithException.java | 38 +
.../FilesystemSchemeConfigTest.java | 6 +-
.../fs/LimitedConnectionsConfigurationTest.java | 84 ++
...itedConnectionsFileSystemDelegationTest.java | 241 ++++
.../fs/LimitedConnectionsFileSystemTest.java | 742 ++++++++++++
.../flink/runtime/fs/hdfs/HadoopFsFactory.java | 35 +-
.../runtime/fs/hdfs/HadoopFsFactoryTest.java | 3 +-
.../LimitedConnectionsConfigurationTest.java | 84 ++
15 files changed, 2654 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/docs/ops/filesystems.md
----------------------------------------------------------------------
diff --git a/docs/ops/filesystems.md b/docs/ops/filesystems.md
new file mode 100644
index 0000000..5b2a1e7
--- /dev/null
+++ b/docs/ops/filesystems.md
@@ -0,0 +1,126 @@
+---
+title: "File Systems"
+nav-parent_id: ops
+nav-pos: 12
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This page provides details on setting up and configuring distributed file systems for use with Flink.
+
+## Flink' File System support
+
+Flink uses file systems both as a source and sink in streaming/batch applications, and as a target for checkpointing.
+These file systens can for example be *Unix/Windows file systems*, *HDFS*, or even object stores like *S3*.
+
+The file system used for a specific file is determined by the file URI's scheme. For example `file:///home/user/text.txt` refers to
+a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` refers to a file in a specific HDFS cluster.
+
+File systems are represented via the `org.apache.flink.core.fs.FileSystem` class, which captures the ways to access and modify
+files and objects in that file system. FileSystem instances are instantiates once per process and then cached / pooled, to
+avoid configuration overhead per stream creation, and to enforce certain constraints, like connection/stream limits.
+
+### Built-in File Systems
+
+Flink directly implements the following file systems:
+
+ - **local**: This file system is used when the scheme is *"file://"*, and it represents the file system of the local machine,
+including any NFS or SAN that is mounted into that local file system.
+
+ - **S3**: Flink directly provides file systems to talk to Amazon S3, registered under the scheme *"s3://"*.
+There are two alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`, based on code from the [Presto project](https://prestodb.io/)
+and the [Hadoop Project](https://hadoop.apache.org/). Both implementations are self-contained with no dependency footprint.
+To use those when using Flink as a library, add the resective maven dependency (`org.apache.flink:flink-s3-fs-presto:{{ site.version }}` or `org.apache.flink:flink-s3-fs-hadoop:{{ site.version }}`).
+When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
+See [AWS setup](deployment/aws.html) for details.
+
+ - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.
+
+### HDFS and Hadoop File System support
+
+For a scheme where Flink does not implemented a file system itself, Flink will try to use Hadoop to instantiate a file system for the respective scheme.
+All Hadoop file systems are automatically available once `flink-runtime` and the relevant Hadoop libraries are in classpath.
+
+That way, Flink seamslessly supports all of Hadoop file systems, and all Hadoop-compatible file systems (HCFS), for example:
+
+ - **hdfs**
+ - **ftp**
+ - **s3n** and **s3a**
+ - **har**
+ - ...
+
+
+## Common File System configurations
+
+The following configuration settings exist across different file systems
+
+#### Default File System
+
+If paths to files do not explicitly specify a file system scheme (and authority), a default scheme (and authority) will be used.
+
+~~~
+fs.default-scheme: <default-fs>
+~~~
+
+For example, if the default file system configured as `fs.default-scheme: hdfs://localhost:9000/`, then a a file path of
+`/user/hugo/in.txt'` is interpreted as `hdfs://localhost:9000/user/hugo/in.txt'`
+
+#### Connection limiting
+
+You can limit the total number of connections that a file system can concurrently open. This is useful when the file system cannot handle a large number
+of concurrent reads / writes or open connections at the same time.
+
+For example, very small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint.
+
+To limit a specific file system's connections, add the following entries to the Flink configuration. The file system to be limited is identified by
+its scheme.
+
+~~~
+fs.<scheme>.limit.total: (number, 0/-1 mean no limit)
+fs.<scheme>.limit.input: (number, 0/-1 mean no limit)
+fs.<scheme>.limit.output: (number, 0/-1 mean no limit)
+fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
+fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
+~~~
+
+You can limit the number if input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on
+the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation will block until some streams are closed.
+If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening will fail.
+
+To prevent inactive streams from taking up the complete pool (preventing new connections to be opened), you can add an inactivity timeout for streams:
+`fs.<scheme>.limit.stream-timeout`. If a stream does not read/write any bytes for at least that amout of time, it is forcibly closed.
+
+These limits are enforced per TaskManager, so each TaskManager in a Flink application or cluster will open up to that number of connections.
+In addition, the The limit are also enforced only per FileSystem instance. Because File Systems are created per scheme and authority, different
+authorities will have their own connection pool. For example `hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools.
+
+
+## Adding new File System Implementations
+
+File system implementations are discovered by Flink through Java's service abstraction, making it easy to add additional file system implementations.
+
+In order to add a new File System, the following steps are needed:
+
+ - Add the File System implementation, which is a subclass of `org.apache.flink.core.fs.FileSystem`.
+ - Add a factory that instantiates that file system and declares the scheme under which the FileSystem is registered. This must be a subclass of `org.apache.flink.core.fs.FileSystemFactory`.
+ - Add a service entry. Create a file `META-INF/services/org.apache.flink.core.fs.FileSystemFactory` which contains the class name of your file system factory class.
+
+See the [Java Service Loader docs](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html) for more details on how service loaders work.
+
+{% top %}
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index fcf73b8..f80bd9b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -614,7 +614,10 @@ public final class ConfigConstants {
* Key to specify the default filesystem to be used by a job. In the case of
* <code>file:///</code>, which is the default (see {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}),
* the local filesystem is going to be used to resolve URIs without an explicit scheme.
- * */
+ *
+ * @deprecated Use {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME} instead.
+ */
+ @Deprecated
public static final String FILESYSTEM_SCHEME = "fs.default-scheme";
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index e8ab8e4..928e810 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -20,6 +20,9 @@ package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
+/**
+ * The set of configuration options for core parameters.
+ */
@PublicEvolving
public class CoreOptions {
@@ -83,4 +86,58 @@ public class CoreOptions {
public static final ConfigOption<String> CHECKPOINTS_DIRECTORY = ConfigOptions
.key("state.checkpoints.dir")
.noDefaultValue();
+
+ // ------------------------------------------------------------------------
+ // file systems
+ // ------------------------------------------------------------------------
+
+ /**
+ * The default filesystem scheme, used for paths that do not declare a scheme explicitly.
+ */
+ public static final ConfigOption<String> DEFAULT_FILESYSTEM_SCHEME = ConfigOptions
+ .key("fs.default-scheme")
+ .noDefaultValue();
+
+ /**
+ * The total number of input plus output connections that a file system for the given scheme may open.
+ * Unlimited be default.
+ */
+ public static ConfigOption<Integer> fileSystemConnectionLimit(String scheme) {
+ return ConfigOptions.key("fs." + scheme + ".limit.total").defaultValue(-1);
+ }
+
+ /**
+ * The total number of input connections that a file system for the given scheme may open.
+ * Unlimited be default.
+ */
+ public static ConfigOption<Integer> fileSystemConnectionLimitIn(String scheme) {
+ return ConfigOptions.key("fs." + scheme + ".limit.input").defaultValue(-1);
+ }
+
+ /**
+ * The total number of output connections that a file system for the given scheme may open.
+ * Unlimited be default.
+ */
+ public static ConfigOption<Integer> fileSystemConnectionLimitOut(String scheme) {
+ return ConfigOptions.key("fs." + scheme + ".limit.output").defaultValue(-1);
+ }
+
+ /**
+ * If any connection limit is configured, this option can be optionally set to define after
+ * which time (in milliseconds) stream opening fails with a timeout exception, if no stream
+ * connection becomes available. Unlimited timeout be default.
+ */
+ public static ConfigOption<Long> fileSystemConnectionLimitTimeout(String scheme) {
+ return ConfigOptions.key("fs." + scheme + ".limit.timeout").defaultValue(0L);
+ }
+
+ /**
+ * If any connection limit is configured, this option can be optionally set to define after
+ * which time (in milliseconds) inactive streams are reclaimed. This option can help to prevent
+ * that inactive streams make up the full pool of limited connections, and no further connections
+ * can be established. Unlimited timeout be default.
+ */
+ public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeout(String scheme) {
+ return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java
new file mode 100644
index 0000000..b85a7d6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem.ConnectionLimitingSettings;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A wrapping factory that adds a {@link LimitedConnectionsFileSystem} to a file system.
+ */
+@Internal
+public class ConnectionLimitingFactory implements FileSystemFactory {
+
+ private final FileSystemFactory factory;
+
+ private final ConnectionLimitingSettings settings;
+
+ private ConnectionLimitingFactory(
+ FileSystemFactory factory,
+ ConnectionLimitingSettings settings) {
+
+ this.factory = checkNotNull(factory);
+ this.settings = checkNotNull(settings);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String getScheme() {
+ return factory.getScheme();
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ factory.configure(config);
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ FileSystem original = factory.create(fsUri);
+ return new LimitedConnectionsFileSystem(original,
+ settings.limitTotal, settings.limitOutput, settings.limitInput,
+ settings.streamOpenTimeout, settings.streamInactivityTimeout);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Decorates the given factory for a {@code ConnectionLimitingFactory}, if the given
+ * configuration configured connection limiting for the given file system scheme.
+ * Otherwise, it returns the given factory as is.
+ *
+ * @param factory The factory to potentially decorate.
+ * @param scheme The file scheme for which to check the configuration.
+ * @param config The configuration
+ *
+ * @return The decorated factors, if connection limiting is configured, the original factory otherwise.
+ */
+ public static FileSystemFactory decorateIfLimited(FileSystemFactory factory, String scheme, Configuration config) {
+ checkNotNull(factory, "factory");
+
+ final ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(config, scheme);
+
+ // decorate only if any limit is configured
+ if (settings == null) {
+ // no limit configured
+ return factory;
+ }
+ else {
+ return new ConnectionLimitingFactory(factory, settings);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 7a8245a..18baaa5 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -26,8 +26,8 @@ package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalFileSystemFactory;
@@ -43,8 +43,11 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.locks.ReentrantLock;
@@ -214,8 +217,12 @@ public abstract class FileSystem {
/** Cache for file systems, by scheme + authority. */
private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();
- /** Mapping of file system schemes to the corresponding implementation factories. */
- private static final HashMap<String, FileSystemFactory> FS_FACTORIES = loadFileSystems();
+ /** All available file system factories. */
+ private static final List<FileSystemFactory> RAW_FACTORIES = loadFileSystems();
+
+ /** Mapping of file system schemes to the corresponding factories,
+ * populated in {@link FileSystem#initialize(Configuration)}. */
+ private static final HashMap<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();
/** The default factory that is used when no scheme matches. */
private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory();
@@ -236,7 +243,7 @@ public abstract class FileSystem {
* of this method, this method clears the file system instance cache.
*
* <p>This method also reads the default file system URI from the configuration key
- * {@link ConfigConstants#FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where
+ * {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where
* the URI has no scheme will be interpreted as relative to that URI.
* As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}.
* A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
@@ -249,17 +256,22 @@ public abstract class FileSystem {
try {
// make sure file systems are re-instantiated after re-configuration
CACHE.clear();
+ FS_FACTORIES.clear();
// configure all file system factories
- for (FileSystemFactory factory : FS_FACTORIES.values()) {
+ for (FileSystemFactory factory : RAW_FACTORIES) {
factory.configure(config);
+ String scheme = factory.getScheme();
+
+ FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config);
+ FS_FACTORIES.put(scheme, fsf);
}
// configure the default (fallback) factory
FALLBACK_FACTORY.configure(config);
// also read the default file system scheme
- final String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME, null);
+ final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null);
if (stringifiedUri == null) {
DEFAULT_SCHEME = null;
}
@@ -269,7 +281,7 @@ public abstract class FileSystem {
}
catch (URISyntaxException e) {
throw new IllegalConfigurationException("The default file system scheme ('" +
- ConfigConstants.FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e);
+ CoreOptions.DEFAULT_FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e);
}
}
}
@@ -368,6 +380,13 @@ public abstract class FileSystem {
}
}
+ // this "default" initialization makes sure that the FileSystem class works
+ // even when not configured with an explicit Flink configuration, like on
+ // JobManager or TaskManager setup
+ if (FS_FACTORIES.isEmpty()) {
+ initialize(new Configuration());
+ }
+
// Try to create a new file system
final FileSystem fs;
final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme());
@@ -907,11 +926,11 @@ public abstract class FileSystem {
*
* @return A map from the file system scheme to corresponding file system factory.
*/
- private static HashMap<String, FileSystemFactory> loadFileSystems() {
- final HashMap<String, FileSystemFactory> map = new HashMap<>();
+ private static List<FileSystemFactory> loadFileSystems() {
+ final ArrayList<FileSystemFactory> list = new ArrayList<>();
// by default, we always have the local file system factory
- map.put("file", new LocalFileSystemFactory());
+ list.add(new LocalFileSystemFactory());
LOG.debug("Loading extension file systems via services");
@@ -926,9 +945,8 @@ public abstract class FileSystem {
while (iter.hasNext()) {
try {
FileSystemFactory factory = iter.next();
- String scheme = factory.getScheme();
- map.put(scheme, factory);
- LOG.debug("Added file system {}:{}", scheme, factory.getClass().getName());
+ list.add(factory);
+ LOG.debug("Added file system {}:{}", factory.getScheme(), factory.getClass().getName());
}
catch (Throwable t) {
// catching Throwable here to handle various forms of class loading
@@ -945,7 +963,7 @@ public abstract class FileSystem {
LOG.error("Failed to load additional file systems via services", t);
}
- return map;
+ return Collections.unmodifiableList(list);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
new file mode 100644
index 0000000..5353563
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
@@ -0,0 +1,1114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * <p>This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and connections
+ * would fail in that case. This happens, for example, for very small HDFS clusters
+ * with few RPC handlers, when a large Flink job tries to build up many connections during
+ * a checkpoint.
+ *
+ * <p>The filesystem may track the progress of streams and close streams that have been
+ * inactive for too long, to avoid locked streams of taking up the complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to open a new stream
+ * periodically check the currently open streams once the limit of open streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+ /** The original file system to which connections are limited. */
+ private final FileSystem originalFs;
+
+ /** The lock that synchronizes connection bookkeeping. */
+ private final ReentrantLock lock;
+
+ /** Condition for threads that are blocking on the availability of new connections. */
+ private final Condition available;
+
+ /** The maximum number of concurrently open output streams. */
+ private final int maxNumOpenOutputStreams;
+
+ /** The maximum number of concurrently open input streams. */
+ private final int maxNumOpenInputStreams;
+
+ /** The maximum number of concurrently open streams (input + output). */
+ private final int maxNumOpenStreamsTotal;
+
+ /** The nanoseconds that a opening a stream may wait for availability. */
+ private final long streamOpenTimeoutNanos;
+
+ /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */
+ private final long streamInactivityTimeoutNanos;
+
+ /** The set of currently open output streams. */
+ @GuardedBy("lock")
+ private final HashSet<OutStream> openOutputStreams;
+
+ /** The set of currently open input streams. */
+ @GuardedBy("lock")
+ private final HashSet<InStream> openInputStreams;
+
+ /** The number of output streams reserved to be opened. */
+ @GuardedBy("lock")
+ private int numReservedOutputStreams;
+
+ /** The number of input streams reserved to be opened. */
+ @GuardedBy("lock")
+ private int numReservedInputStreams;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new output connection limiting file system.
+ *
+ * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
+ * then they are terminated as "inactive", to prevent that the limited number of connections gets
+ * stuck on only blocked threads.
+ *
+ * @param originalFs The original file system to which connections are limited.
+ * @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no limit).
+ */
+ public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal) {
+ this(originalFs, maxNumOpenStreamsTotal, 0, 0);
+ }
+
+ /**
+ * Creates a new output connection limiting file system.
+ *
+ * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
+ * then they are terminated as "inactive", to prevent that the limited number of connections gets
+ * stuck on only blocked threads.
+ *
+ * @param originalFs The original file system to which connections are limited.
+ * @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no limit).
+ * @param streamOpenTimeout The maximum number of milliseconds that the file system will wait when
+ * no more connections are currently permitted.
+ * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
+ * bytes before it is closed as inactive.
+ */
+ public LimitedConnectionsFileSystem(
+ FileSystem originalFs,
+ int maxNumOpenStreamsTotal,
+ long streamOpenTimeout,
+ long streamInactivityTimeout) {
+ this(originalFs, maxNumOpenStreamsTotal, 0, 0, streamOpenTimeout, streamInactivityTimeout);
+ }
+
+ /**
+ * Creates a new output connection limiting file system, limiting input and output streams with
+ * potentially different quotas.
+ *
+ * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
+ * then they are terminated as "inactive", to prevent that the limited number of connections gets
+ * stuck on only blocked threads.
+ *
+ * @param originalFs The original file system to which connections are limited.
+ * @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no limit).
+ * @param maxNumOpenOutputStreams The maximum number of concurrent open output streams (0 means no limit).
+ * @param maxNumOpenInputStreams The maximum number of concurrent open input streams (0 means no limit).
+ * @param streamOpenTimeout The maximum number of milliseconds that the file system will wait when
+ * no more connections are currently permitted.
+ * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
+ * bytes before it is closed as inactive.
+ */
+ public LimitedConnectionsFileSystem(
+ FileSystem originalFs,
+ int maxNumOpenStreamsTotal,
+ int maxNumOpenOutputStreams,
+ int maxNumOpenInputStreams,
+ long streamOpenTimeout,
+ long streamInactivityTimeout) {
+
+ checkArgument(maxNumOpenStreamsTotal >= 0, "maxNumOpenStreamsTotal must be >= 0");
+ checkArgument(maxNumOpenOutputStreams >= 0, "maxNumOpenOutputStreams must be >= 0");
+ checkArgument(maxNumOpenInputStreams >= 0, "maxNumOpenInputStreams must be >= 0");
+ checkArgument(streamOpenTimeout >= 0, "stream opening timeout must be >= 0 (0 means infinite timeout)");
+ checkArgument(streamInactivityTimeout >= 0, "stream inactivity timeout must be >= 0 (0 means infinite timeout)");
+
+ this.originalFs = checkNotNull(originalFs, "originalFs");
+ this.lock = new ReentrantLock(true);
+ this.available = lock.newCondition();
+ this.openOutputStreams = new HashSet<>();
+ this.openInputStreams = new HashSet<>();
+ this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
+ this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
+ this.maxNumOpenInputStreams = maxNumOpenInputStreams;
+
+ // assign nanos overflow aware
+ final long openTimeoutNanos = streamOpenTimeout * 1_000_000;
+ final long inactivityTimeoutNanos = streamInactivityTimeout * 1_000_000;
+
+ this.streamOpenTimeoutNanos =
+ openTimeoutNanos >= streamOpenTimeout ? openTimeoutNanos : Long.MAX_VALUE;
+
+ this.streamInactivityTimeoutNanos =
+ inactivityTimeoutNanos >= streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the maximum number of concurrently open output streams.
+ */
+ public int getMaxNumOpenOutputStreams() {
+ return maxNumOpenOutputStreams;
+ }
+
+ /**
+ * Gets the maximum number of concurrently open input streams.
+ */
+ public int getMaxNumOpenInputStreams() {
+ return maxNumOpenInputStreams;
+ }
+
+ /**
+ * Gets the maximum number of concurrently open streams (input + output).
+ */
+ public int getMaxNumOpenStreamsTotal() {
+ return maxNumOpenStreamsTotal;
+ }
+
+ /**
+ * Gets the number of milliseconds that a opening a stream may wait for availability in the
+ * connection pool.
+ */
+ public long getStreamOpenTimeout() {
+ return streamOpenTimeoutNanos / 1_000_000;
+ }
+
+ /**
+ * Gets the milliseconds that a stream may spend not writing any bytes before it is closed as inactive.
+ */
+ public long getStreamInactivityTimeout() {
+ return streamInactivityTimeoutNanos / 1_000_000;
+ }
+
+ /**
+ * Gets the total number of open streams (input plus output).
+ */
+ public int getTotalNumberOfOpenStreams() {
+ lock.lock();
+ try {
+ return numReservedOutputStreams + numReservedInputStreams;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Gets the number of currently open output streams.
+ */
+ public int getNumberOfOpenOutputStreams() {
+ lock.lock();
+ try {
+ return numReservedOutputStreams;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Gets the number of currently open input streams.
+ */
+ public int getNumberOfOpenInputStreams() {
+ return numReservedInputStreams;
+ }
+
+ // ------------------------------------------------------------------------
+ // input & output stream opening methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException {
+ return createOutputStream(() -> originalFs.create(f, overwriteMode));
+ }
+
+ @Override
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ public FSDataOutputStream create(
+ Path f,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize) throws IOException {
+
+ return createOutputStream(() -> originalFs.create(f, overwrite, bufferSize, replication, blockSize));
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return createInputStream(() -> originalFs.open(f, bufferSize));
+ }
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ return createInputStream(() -> originalFs.open(f));
+ }
+
+ private FSDataOutputStream createOutputStream(
+ final SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws IOException {
+
+ final SupplierWithException<OutStream, IOException> wrappedStreamOpener =
+ () -> new OutStream(streamOpener.get(), this);
+
+ return createStream(wrappedStreamOpener, openOutputStreams, true);
+ }
+
+ private FSDataInputStream createInputStream(
+ final SupplierWithException<FSDataInputStream, IOException> streamOpener) throws IOException {
+
+ final SupplierWithException<InStream, IOException> wrappedStreamOpener =
+ () -> new InStream(streamOpener.get(), this);
+
+ return createStream(wrappedStreamOpener, openInputStreams, false);
+ }
+
+ // ------------------------------------------------------------------------
+ // other delegating file system methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public FileSystemKind getKind() {
+ return originalFs.getKind();
+ }
+
+ @Override
+ public boolean isDistributedFS() {
+ return originalFs.isDistributedFS();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return originalFs.getWorkingDirectory();
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ return originalFs.getHomeDirectory();
+ }
+
+ @Override
+ public URI getUri() {
+ return originalFs.getUri();
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return originalFs.getFileStatus(f);
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+ return originalFs.getFileBlockLocations(file, start, len);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ return originalFs.listStatus(f);
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return originalFs.delete(f, recursive);
+ }
+
+ @Override
+ public boolean mkdirs(Path f) throws IOException {
+ return originalFs.mkdirs(f);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return originalFs.rename(src, dst);
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ return originalFs.exists(f);
+ }
+
+ @Override
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ public long getDefaultBlockSize() {
+ return originalFs.getDefaultBlockSize();
+ }
+
+ // ------------------------------------------------------------------------
+
+ private <T extends StreamWithTimeout> T createStream(
+ final SupplierWithException<T, IOException> streamOpener,
+ final HashSet<T> openStreams,
+ final boolean output) throws IOException {
+
+ final int outputLimit = output && maxNumOpenInputStreams > 0 ? maxNumOpenOutputStreams : Integer.MAX_VALUE;
+ final int inputLimit = !output && maxNumOpenInputStreams > 0 ? maxNumOpenInputStreams : Integer.MAX_VALUE;
+ final int totalLimit = maxNumOpenStreamsTotal > 0 ? maxNumOpenStreamsTotal : Integer.MAX_VALUE;
+ final int outputCredit = output ? 1 : 0;
+ final int inputCredit = output ? 0 : 1;
+
+ // because waiting for availability may take long, we need to be interruptible here
+ // and handle interrupted exceptions as I/O errors
+ // even though the code is written to make sure the lock is held for a short time only,
+ // making the lock acquisition interruptible helps to guard against the cases where
+ // a supposedly fast operation (like 'getPos()' on a stream) actually takes long.
+ try {
+ lock.lockInterruptibly();
+ try {
+ // some integrity checks
+ assert openOutputStreams.size() <= numReservedOutputStreams;
+ assert openInputStreams.size() <= numReservedInputStreams;
+
+ // wait until there are few enough streams so we can open another
+ waitForAvailability(totalLimit, outputLimit, inputLimit);
+
+ // We do not open the stream here in the locked scope because opening a stream
+ // could take a while. Holding the lock during that operation would block all concurrent
+ // attempts to try and open a stream, effectively serializing all calls to open the streams.
+ numReservedOutputStreams += outputCredit;
+ numReservedInputStreams += inputCredit;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ catch (InterruptedException e) {
+ // restore interruption flag
+ Thread.currentThread().interrupt();
+ throw new IOException("interrupted before opening stream");
+ }
+
+ // open the stream outside the lock.
+ boolean success = false;
+ try {
+ final T out = streamOpener.get();
+
+ // add the stream to the set, need to re-acquire the lock
+ lock.lock();
+ try {
+ openStreams.add(out);
+ } finally {
+ lock.unlock();
+ }
+
+ // good, can now return cleanly
+ success = true;
+ return out;
+ }
+ finally {
+ if (!success) {
+ // remove the reserved credit
+ // we must open this non-interruptibly, because this must succeed!
+ lock.lock();
+ try {
+ numReservedOutputStreams -= outputCredit;
+ numReservedInputStreams -= inputCredit;
+ available.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
+
+ @GuardedBy("lock")
+ private void waitForAvailability(
+ int totalLimit,
+ int outputLimit,
+ int inputLimit) throws InterruptedException, IOException {
+
+ checkState(lock.isHeldByCurrentThread());
+
+ // compute the deadline of this operations
+ final long deadline;
+ if (streamOpenTimeoutNanos == 0) {
+ deadline = Long.MAX_VALUE;
+ } else {
+ long deadlineNanos = System.nanoTime() + streamOpenTimeoutNanos;
+ // check for overflow
+ deadline = deadlineNanos > 0 ? deadlineNanos : Long.MAX_VALUE;
+ }
+
+ // wait for available connections
+ long timeLeft;
+
+ if (streamInactivityTimeoutNanos == 0) {
+ // simple case: just wait
+ while ((timeLeft = (deadline - System.nanoTime())) > 0 &&
+ !hasAvailability(totalLimit, outputLimit, inputLimit)) {
+
+ available.await(timeLeft, TimeUnit.NANOSECONDS);
+ }
+ }
+ else {
+ // complex case: chase down inactive streams
+ final long checkIntervalNanos = (streamInactivityTimeoutNanos >>> 1) + 1;
+
+ long now;
+ while ((timeLeft = (deadline - (now = System.nanoTime()))) > 0 && // while still within timeout
+ !hasAvailability(totalLimit, outputLimit, inputLimit)) {
+
+ // check all streams whether there in one that has been inactive for too long
+ if (!(closeInactiveStream(openOutputStreams, now) || closeInactiveStream(openInputStreams, now))) {
+ // only wait if we did not manage to close any stream.
+ // otherwise eagerly check again if we have availability now (we should have!)
+ long timeToWait = Math.min(checkIntervalNanos, timeLeft);
+ available.await(timeToWait, TimeUnit.NANOSECONDS);
+ }
+ }
+ }
+
+ // check for timeout
+ // we check availability again to catch cases where the timeout expired while waiting
+ // to re-acquire the lock
+ if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit, inputLimit)) {
+ throw new IOException(String.format(
+ "Timeout while waiting for an available stream/connection. " +
+ "limits: total=%d, input=%d, output=%d ; Open: input=%d, output=%d ; timeout: %d ms",
+ maxNumOpenStreamsTotal, maxNumOpenInputStreams, maxNumOpenOutputStreams,
+ numReservedInputStreams, numReservedOutputStreams, getStreamOpenTimeout()));
+ }
+ }
+
+ @GuardedBy("lock")
+ private boolean hasAvailability(int totalLimit, int outputLimit, int inputLimit) {
+ return numReservedOutputStreams < outputLimit &&
+ numReservedInputStreams < inputLimit &&
+ numReservedOutputStreams + numReservedInputStreams < totalLimit;
+ }
+
+ @GuardedBy("lock")
+ private boolean closeInactiveStream(HashSet<? extends StreamWithTimeout> streams, long nowNanos) {
+ for (StreamWithTimeout stream : streams) {
+ try {
+ final StreamProgressTracker tracker = stream.getProgressTracker();
+
+ // If the stream is closed already, it will be removed anyways, so we
+ // do not classify it as inactive. We also skip the check if another check happened too recently.
+ if (stream.isClosed() || nowNanos < tracker.getLastCheckTimestampNanos() + streamInactivityTimeoutNanos) {
+ // interval since last check not yet over
+ return false;
+ }
+ else if (!tracker.checkNewBytesAndMark(nowNanos)) {
+ stream.closeDueToTimeout();
+ return true;
+ }
+ }
+ catch (StreamTimeoutException ignored) {
+ // may happen due to races
+ }
+ catch (IOException e) {
+ // only log on debug level here, to avoid log spamming
+ LOG.debug("Could not check for stream progress to determine inactivity", e);
+ }
+ }
+
+ return false;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Atomically removes the given output stream from the set of currently open output streams,
+ * and signals that new stream can now be opened.
+ */
+ void unregisterOutputStream(OutStream stream) {
+ lock.lock();
+ try {
+ // only decrement if we actually remove the stream
+ if (openOutputStreams.remove(stream)) {
+ numReservedOutputStreams--;
+ available.signalAll();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Atomically removes the given input stream from the set of currently open input streams,
+ * and signals that new stream can now be opened.
+ */
+ void unregisterInputStream(InStream stream) {
+ lock.lock();
+ try {
+ // only decrement if we actually remove the stream
+ if (openInputStreams.remove(stream)) {
+ numReservedInputStreams--;
+ available.signalAll();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A special IOException, indicating a timeout in the data output stream.
+ */
+ public static final class StreamTimeoutException extends IOException {
+
+ private static final long serialVersionUID = -8790922066795901928L;
+
+ public StreamTimeoutException() {
+ super("Stream closed due to inactivity timeout. " +
+ "This is done to prevent inactive streams from blocking the full " +
+ "pool of limited connections");
+ }
+
+ public StreamTimeoutException(StreamTimeoutException other) {
+ super(other.getMessage(), other);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Interface for streams that can be checked for inactivity.
+ */
+ private interface StreamWithTimeout extends Closeable {
+
+ /**
+ * Gets the progress tracker for this stream.
+ */
+ StreamProgressTracker getProgressTracker();
+
+ /**
+ * Gets the current position in the stream, as in number of bytes read or written.
+ */
+ long getPos() throws IOException;
+
+ /**
+ * Closes the stream asynchronously with a special exception that indicates closing due
+ * to lack of progress.
+ */
+ void closeDueToTimeout() throws IOException;
+
+ /**
+ * Checks whether the stream was closed already.
+ */
+ boolean isClosed();
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A tracker for stream progress. This records the number of bytes read / written together
+ * with a timestamp when the last check happened.
+ */
+ private static final class StreamProgressTracker {
+
+ /** The tracked stream. */
+ private final StreamWithTimeout stream;
+
+ /** The number of bytes written the last time that the {@link #checkNewBytesAndMark(long)}
+ * method was called. It is important to initialize this with {@code -1} so that the
+ * first check (0 bytes) always appears to have made progress. */
+ private volatile long lastCheckBytes = -1;
+
+ /** The timestamp when the last inactivity evaluation was made. */
+ private volatile long lastCheckTimestampNanos;
+
+ StreamProgressTracker(StreamWithTimeout stream) {
+ this.stream = stream;
+ }
+
+ /**
+ * Gets the timestamp when the last inactivity evaluation was made.
+ */
+ public long getLastCheckTimestampNanos() {
+ return lastCheckTimestampNanos;
+ }
+
+ /**
+ * Checks whether there were new bytes since the last time this method was invoked.
+ * This also sets the given timestamp, to be read via {@link #getLastCheckTimestampNanos()}.
+ *
+ * @return True, if there were new bytes, false if not.
+ */
+ public boolean checkNewBytesAndMark(long timestamp) throws IOException {
+ // remember the time when checked
+ lastCheckTimestampNanos = timestamp;
+
+ final long bytesNow = stream.getPos();
+ if (bytesNow > lastCheckBytes) {
+ lastCheckBytes = bytesNow;
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A data output stream that wraps a given data output stream and un-registers
+ * from a given connection-limiting file system
+ * (via {@link LimitedConnectionsFileSystem#unregisterOutputStream(OutStream)}
+ * upon closing.
+ */
+ private static final class OutStream extends FSDataOutputStream implements StreamWithTimeout {
+
+ /** The original data output stream to write to. */
+ private final FSDataOutputStream originalStream;
+
+ /** The connection-limiting file system to un-register from. */
+ private final LimitedConnectionsFileSystem fs;
+
+ /** The progress tracker for this stream. */
+ private final StreamProgressTracker progressTracker;
+
+ /** An exception with which the stream has been externally closed. */
+ private volatile StreamTimeoutException timeoutException;
+
+ /** Flag tracking whether the stream was already closed, for proper inactivity tracking. */
+ private AtomicBoolean closed = new AtomicBoolean();
+
+ OutStream(FSDataOutputStream originalStream, LimitedConnectionsFileSystem fs) {
+ this.originalStream = checkNotNull(originalStream);
+ this.fs = checkNotNull(fs);
+ this.progressTracker = new StreamProgressTracker(this);
+ }
+
+ // --- FSDataOutputStream API implementation
+
+ @Override
+ public void write(int b) throws IOException {
+ try {
+ originalStream.write(b);
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ try {
+ originalStream.write(b, off, len);
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ }
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ try {
+ return originalStream.getPos();
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ return -1; // silence the compiler
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ originalStream.flush();
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ }
+ }
+
+ @Override
+ public void sync() throws IOException {
+ try {
+ originalStream.sync();
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ try {
+ originalStream.close();
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ }
+ finally {
+ fs.unregisterOutputStream(this);
+ }
+ }
+ }
+
+ @Override
+ public void closeDueToTimeout() throws IOException {
+ this.timeoutException = new StreamTimeoutException();
+ close();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ @Override
+ public StreamProgressTracker getProgressTracker() {
+ return progressTracker;
+ }
+
+ private void handleIOException(IOException exception) throws IOException {
+ if (timeoutException == null) {
+ throw exception;
+ } else {
+ // throw a new exception to capture this call's stack trace
+ // the new exception is forwarded as a suppressed exception
+ StreamTimeoutException te = new StreamTimeoutException(timeoutException);
+ te.addSuppressed(exception);
+ throw te;
+ }
+ }
+ }
+
+ /**
+ * A data input stream that wraps a given data input stream and un-registers
+ * from a given connection-limiting file system
+ * (via {@link LimitedConnectionsFileSystem#unregisterInputStream(InStream)}
+ * upon closing.
+ */
+ private static final class InStream extends FSDataInputStream implements StreamWithTimeout {
+
+ /** The original data input stream to read from. */
+ private final FSDataInputStream originalStream;
+
+ /** The connection-limiting file system to un-register from. */
+ private final LimitedConnectionsFileSystem fs;
+
+ /** An exception with which the stream has been externally closed. */
+ private volatile StreamTimeoutException timeoutException;
+
+ /** The progress tracker for this stream. */
+ private final StreamProgressTracker progressTracker;
+
+ /** Flag tracking whether the stream was already closed, for proper inactivity tracking. */
+ private AtomicBoolean closed = new AtomicBoolean();
+
+ InStream(FSDataInputStream originalStream, LimitedConnectionsFileSystem fs) {
+ this.originalStream = checkNotNull(originalStream);
+ this.fs = checkNotNull(fs);
+ this.progressTracker = new StreamProgressTracker(this);
+ }
+
+ // --- FSDataOutputStream API implementation
+
+ @Override
+ public int read() throws IOException {
+ try {
+ return originalStream.read();
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ return 0; // silence the compiler
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ try {
+ return originalStream.read(b);
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ return 0; // silence the compiler
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ try {
+ return originalStream.read(b, off, len);
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ return 0; // silence the compiler
+ }
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ try {
+ return originalStream.skip(n);
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ return 0L; // silence the compiler
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ try {
+ return originalStream.available();
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ return 0; // silence the compiler
+ }
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ originalStream.mark(readlimit);
+ }
+
+ @Override
+ public void reset() throws IOException {
+ try {
+ originalStream.reset();
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ }
+ }
+
+ @Override
+ public boolean markSupported() {
+ return originalStream.markSupported();
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ try {
+ originalStream.seek(desired);
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ }
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ try {
+ return originalStream.getPos();
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ return 0; // silence the compiler
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ try {
+ originalStream.close();
+ }
+ catch (IOException e) {
+ handleIOException(e);
+ }
+ finally {
+ fs.unregisterInputStream(this);
+ }
+ }
+ }
+
+ @Override
+ public void closeDueToTimeout() throws IOException {
+ this.timeoutException = new StreamTimeoutException();
+ close();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ @Override
+ public StreamProgressTracker getProgressTracker() {
+ return progressTracker;
+ }
+
+ private void handleIOException(IOException exception) throws IOException {
+ if (timeoutException == null) {
+ throw exception;
+ } else {
+ // throw a new exception to capture this call's stack trace
+ // the new exception is forwarded as a suppressed exception
+ StreamTimeoutException te = new StreamTimeoutException(timeoutException);
+ te.addSuppressed(exception);
+ throw te;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A simple configuration data object capturing the settings for limited connections.
+ */
+ public static class ConnectionLimitingSettings {
+
+ /** The limit for the total number of connections, or 0, if no limit. */
+ public final int limitTotal;
+
+ /** The limit for the number of input stream connections, or 0, if no limit. */
+ public final int limitInput;
+
+ /** The limit for the number of output stream connections, or 0, if no limit. */
+ public final int limitOutput;
+
+ /** The stream opening timeout for a stream, in milliseconds. */
+ public final long streamOpenTimeout;
+
+ /** The inactivity timeout for a stream, in milliseconds. */
+ public final long streamInactivityTimeout;
+
+ /**
+ * Creates a new ConnectionLimitingSettings with the given parameters.
+ *
+ * @param limitTotal The limit for the total number of connections, or 0, if no limit.
+ * @param limitInput The limit for the number of input stream connections, or 0, if no limit.
+ * @param limitOutput The limit for the number of output stream connections, or 0, if no limit.
+ * @param streamOpenTimeout The maximum number of milliseconds that the file system will wait when
+ * no more connections are currently permitted.
+ * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
+ * bytes before it is closed as inactive.
+ */
+ public ConnectionLimitingSettings(
+ int limitTotal,
+ int limitInput,
+ int limitOutput,
+ long streamOpenTimeout,
+ long streamInactivityTimeout) {
+ checkArgument(limitTotal >= 0);
+ checkArgument(limitInput >= 0);
+ checkArgument(limitOutput >= 0);
+ checkArgument(streamOpenTimeout >= 0);
+ checkArgument(streamInactivityTimeout >= 0);
+
+ this.limitTotal = limitTotal;
+ this.limitInput = limitInput;
+ this.limitOutput = limitOutput;
+ this.streamOpenTimeout = streamOpenTimeout;
+ this.streamInactivityTimeout = streamInactivityTimeout;
+ }
+
+ // --------------------------------------------------------------------
+
+ /**
+ * Parses and returns the settings for connection limiting, for the file system with
+ * the given file system scheme.
+ *
+ * @param config The configuration to check.
+ * @param fsScheme The file system scheme.
+ *
+ * @return The parsed configuration, or null, if no connection limiting is configured.
+ */
+ @Nullable
+ public static ConnectionLimitingSettings fromConfig(Configuration config, String fsScheme) {
+ checkNotNull(fsScheme, "fsScheme");
+ checkNotNull(config, "config");
+
+ final ConfigOption<Integer> totalLimitOption = CoreOptions.fileSystemConnectionLimit(fsScheme);
+ final ConfigOption<Integer> limitInOption = CoreOptions.fileSystemConnectionLimitIn(fsScheme);
+ final ConfigOption<Integer> limitOutOption = CoreOptions.fileSystemConnectionLimitOut(fsScheme);
+
+ final int totalLimit = config.getInteger(totalLimitOption);
+ final int limitIn = config.getInteger(limitInOption);
+ final int limitOut = config.getInteger(limitOutOption);
+
+ checkLimit(totalLimit, totalLimitOption);
+ checkLimit(limitIn, limitInOption);
+ checkLimit(limitOut, limitOutOption);
+
+ // create the settings only, if at least one limit is configured
+ if (totalLimit <= 0 || limitIn <= 0 || limitOut <= 0) {
+ // no limit configured
+ return null;
+ }
+ else {
+ final ConfigOption<Long> openTimeoutOption =
+ CoreOptions.fileSystemConnectionLimitTimeout(fsScheme);
+ final ConfigOption<Long> inactivityTimeoutOption =
+ CoreOptions.fileSystemConnectionLimitStreamInactivityTimeout(fsScheme);
+
+ final long openTimeout = config.getLong(openTimeoutOption);
+ final long inactivityTimeout = config.getLong(inactivityTimeoutOption);
+
+ checkTimeout(openTimeout, openTimeoutOption);
+ checkTimeout(inactivityTimeout, inactivityTimeoutOption);
+
+ return new ConnectionLimitingSettings(
+ totalLimit == -1 ? 0 : totalLimit,
+ limitIn == -1 ? 0 : limitIn,
+ limitOut == -1 ? 0 : limitOut,
+ openTimeout,
+ inactivityTimeout);
+ }
+ }
+
+ private static void checkLimit(int value, ConfigOption<Integer> option) {
+ if (value < -1) {
+ throw new IllegalConfigurationException("Invalid value for '" + option.key() + "': " + value);
+ }
+ }
+
+ private static void checkTimeout(long timeout, ConfigOption<Long> option) {
+ if (timeout < 0) {
+ throw new IllegalConfigurationException("Invalid value for '" + option.key() + "': " + timeout);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
index 7cbc2bd..785391a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.core.fs.local.LocalFileSystem;
import java.net.URI;
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java
new file mode 100644
index 0000000..63be9bf
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+/**
+ * A functional interface for a {@link java.util.function.Supplier} that may
+ * throw exceptions.
+ *
+ * @param <R> The type of the result of the supplier.
+ * @param <E> The type of Exceptions thrown by this function.
+ */
+@FunctionalInterface
+public interface SupplierWithException<R, E extends Throwable> {
+
+ /**
+ * Gets the result of this supplier.
+ *
+ * @return The result of thus supplier.
+ * @throws E This function may throw an exception.
+ */
+ R get() throws E;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
index 43c79c1..1cbc8f2 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
@@ -61,7 +61,7 @@ public class FilesystemSchemeConfigTest extends TestLogger {
@Test
public void testExplicitlySetToLocal() throws Exception {
final Configuration conf = new Configuration();
- conf.setString(ConfigConstants.FILESYSTEM_SCHEME, LocalFileSystem.getLocalFsURI().toString());
+ conf.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, LocalFileSystem.getLocalFsURI().toString());
FileSystem.initialize(conf);
URI justPath = new URI(tempFolder.newFile().toURI().getPath());
@@ -74,7 +74,7 @@ public class FilesystemSchemeConfigTest extends TestLogger {
@Test
public void testExplicitlySetToOther() throws Exception {
final Configuration conf = new Configuration();
- conf.setString(ConfigConstants.FILESYSTEM_SCHEME, "otherFS://localhost:1234/");
+ conf.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "otherFS://localhost:1234/");
FileSystem.initialize(conf);
URI justPath = new URI(tempFolder.newFile().toURI().getPath());
@@ -92,7 +92,7 @@ public class FilesystemSchemeConfigTest extends TestLogger {
@Test
public void testExplicitlyPathTakesPrecedence() throws Exception {
final Configuration conf = new Configuration();
- conf.setString(ConfigConstants.FILESYSTEM_SCHEME, "otherFS://localhost:1234/");
+ conf.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "otherFS://localhost:1234/");
FileSystem.initialize(conf);
URI pathAndScheme = tempFolder.newFile().toURI();
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
new file mode 100644
index 0000000..4742a7e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.testutils.TestFileSystem;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that validate that the configuration for limited FS
+ * connections are properly picked up.
+ */
+public class LimitedConnectionsConfigurationTest {
+
+ @Rule
+ public final TemporaryFolder tempDir = new TemporaryFolder();
+
+ @Test
+ public void testConfiguration() throws Exception {
+ final String fsScheme = TestFileSystem.SCHEME;
+
+ // nothing configured, we should get a regular file system
+ FileSystem schemeFs = FileSystem.get(URI.create(fsScheme + ":///a/b/c"));
+ FileSystem localFs = FileSystem.get(tempDir.newFile().toURI());
+
+ assertFalse(schemeFs instanceof LimitedConnectionsFileSystem);
+ assertFalse(localFs instanceof LimitedConnectionsFileSystem);
+
+ // configure some limits, which should cause "fsScheme" to be limited
+
+ final Configuration config = new Configuration();
+ config.setInteger("fs." + fsScheme + ".limit.total", 42);
+ config.setInteger("fs." + fsScheme + ".limit.input", 11);
+ config.setInteger("fs." + fsScheme + ".limit.output", 40);
+ config.setInteger("fs." + fsScheme + ".limit.timeout", 12345);
+ config.setInteger("fs." + fsScheme + ".limit.stream-timeout", 98765);
+
+ try {
+ FileSystem.initialize(config);
+
+ schemeFs = FileSystem.get(URI.create(fsScheme + ":///a/b/c"));
+ localFs = FileSystem.get(tempDir.newFile().toURI());
+
+ assertTrue(schemeFs instanceof LimitedConnectionsFileSystem);
+ assertFalse(localFs instanceof LimitedConnectionsFileSystem);
+
+ LimitedConnectionsFileSystem limitedFs = (LimitedConnectionsFileSystem) schemeFs;
+ assertEquals(42, limitedFs.getMaxNumOpenStreamsTotal());
+ assertEquals(11, limitedFs.getMaxNumOpenInputStreams());
+ assertEquals(40, limitedFs.getMaxNumOpenOutputStreams());
+ assertEquals(12345, limitedFs.getStreamOpenTimeout());
+ assertEquals(98765, limitedFs.getStreamInactivityTimeout());
+ }
+ finally {
+ // clear all settings
+ FileSystem.initialize(new Configuration());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
new file mode 100644
index 0000000..b133677
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyShort;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that the method delegation works properly the {@link LimitedConnectionsFileSystem}
+ * and its created input and output streams.
+ */
+public class LimitedConnectionsFileSystemDelegationTest {
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testDelegateFsMethods() throws IOException {
+ final FileSystem fs = mock(FileSystem.class);
+ when(fs.open(any(Path.class))).thenReturn(mock(FSDataInputStream.class));
+ when(fs.open(any(Path.class), anyInt())).thenReturn(mock(FSDataInputStream.class));
+ when(fs.create(any(Path.class), anyBoolean())).thenReturn(mock(FSDataOutputStream.class));
+ when(fs.create(any(Path.class), any(WriteMode.class))).thenReturn(mock(FSDataOutputStream.class));
+ when(fs.create(any(Path.class), anyBoolean(), anyInt(), anyShort(), anyLong())).thenReturn(mock(FSDataOutputStream.class));
+
+ final LimitedConnectionsFileSystem lfs = new LimitedConnectionsFileSystem(fs, 1000);
+ final Random rnd = new Random();
+
+ lfs.isDistributedFS();
+ verify(fs).isDistributedFS();
+
+ lfs.getWorkingDirectory();
+ verify(fs).isDistributedFS();
+
+ lfs.getHomeDirectory();
+ verify(fs).getHomeDirectory();
+
+ lfs.getUri();
+ verify(fs).getUri();
+
+ {
+ Path path = mock(Path.class);
+ lfs.getFileStatus(path);
+ verify(fs).getFileStatus(path);
+ }
+
+ {
+ FileStatus path = mock(FileStatus.class);
+ int pos = rnd.nextInt();
+ int len = rnd.nextInt();
+ lfs.getFileBlockLocations(path, pos, len);
+ verify(fs).getFileBlockLocations(path, pos, len);
+ }
+
+ {
+ Path path = mock(Path.class);
+ int bufferSize = rnd.nextInt();
+ lfs.open(path, bufferSize);
+ verify(fs).open(path, bufferSize);
+ }
+
+ {
+ Path path = mock(Path.class);
+ lfs.open(path);
+ verify(fs).open(path);
+ }
+
+ lfs.getDefaultBlockSize();
+ verify(fs).getDefaultBlockSize();
+
+ {
+ Path path = mock(Path.class);
+ lfs.listStatus(path);
+ verify(fs).listStatus(path);
+ }
+
+ {
+ Path path = mock(Path.class);
+ lfs.exists(path);
+ verify(fs).exists(path);
+ }
+
+ {
+ Path path = mock(Path.class);
+ boolean recursive = rnd.nextBoolean();
+ lfs.delete(path, recursive);
+ verify(fs).delete(path, recursive);
+ }
+
+ {
+ Path path = mock(Path.class);
+ lfs.mkdirs(path);
+ verify(fs).mkdirs(path);
+ }
+
+ {
+ Path path = mock(Path.class);
+ boolean overwrite = rnd.nextBoolean();
+ int bufferSize = rnd.nextInt();
+ short replication = (short) rnd.nextInt();
+ long blockSize = rnd.nextInt();
+
+ lfs.create(path, overwrite, bufferSize, replication, blockSize);
+ verify(fs).create(path, overwrite, bufferSize, replication, blockSize);
+ }
+
+ {
+ Path path = mock(Path.class);
+ WriteMode mode = rnd.nextBoolean() ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE;
+ lfs.create(path, mode);
+ verify(fs).create(path, mode);
+ }
+
+ {
+ Path path1 = mock(Path.class);
+ Path path2 = mock(Path.class);
+ lfs.rename(path1, path2);
+ verify(fs).rename(path1, path2);
+ }
+
+ {
+ FileSystemKind kind = rnd.nextBoolean() ? FileSystemKind.FILE_SYSTEM : FileSystemKind.OBJECT_STORE;
+ when(fs.getKind()).thenReturn(kind);
+ assertEquals(kind, lfs.getKind());
+ verify(fs).getKind();
+ }
+ }
+
+ @Test
+ public void testDelegateOutStreamMethods() throws IOException {
+
+ // mock the output stream
+ final FSDataOutputStream mockOut = mock(FSDataOutputStream.class);
+ final long outPos = 46651L;
+ when(mockOut.getPos()).thenReturn(outPos);
+
+ final FileSystem fs = mock(FileSystem.class);
+ when(fs.create(any(Path.class), any(WriteMode.class))).thenReturn(mockOut);
+
+ final LimitedConnectionsFileSystem lfs = new LimitedConnectionsFileSystem(fs, 100);
+ final FSDataOutputStream out = lfs.create(mock(Path.class), WriteMode.OVERWRITE);
+
+ // validate the output stream
+
+ out.write(77);
+ verify(mockOut).write(77);
+
+ {
+ byte[] bytes = new byte[1786];
+ out.write(bytes, 100, 111);
+ verify(mockOut).write(bytes, 100, 111);
+ }
+
+ assertEquals(outPos, out.getPos());
+
+ out.flush();
+ verify(mockOut).flush();
+
+ out.sync();
+ verify(mockOut).sync();
+
+ out.close();
+ verify(mockOut).close();
+ }
+
+ @Test
+ public void testDelegateInStreamMethods() throws IOException {
+ // mock the input stream
+ final FSDataInputStream mockIn = mock(FSDataInputStream.class);
+ final int value = 93;
+ final int bytesRead = 11;
+ final long inPos = 93;
+ final int available = 17;
+ final boolean markSupported = true;
+ when(mockIn.read()).thenReturn(value);
+ when(mockIn.read(any(byte[].class), anyInt(), anyInt())).thenReturn(11);
+ when(mockIn.getPos()).thenReturn(inPos);
+ when(mockIn.available()).thenReturn(available);
+ when(mockIn.markSupported()).thenReturn(markSupported);
+
+ final FileSystem fs = mock(FileSystem.class);
+ when(fs.open(any(Path.class))).thenReturn(mockIn);
+
+ final LimitedConnectionsFileSystem lfs = new LimitedConnectionsFileSystem(fs, 100);
+ final FSDataInputStream in = lfs.open(mock(Path.class));
+
+ // validate the input stream
+
+ assertEquals(value, in.read());
+ assertEquals(bytesRead, in.read(new byte[11], 2, 5));
+
+ assertEquals(inPos, in.getPos());
+
+ in.seek(17876);
+ verify(mockIn).seek(17876);
+
+ assertEquals(available, in.available());
+
+ assertEquals(markSupported, in.markSupported());
+
+ in.mark(9876);
+ verify(mockIn).mark(9876);
+
+ in.close();
+ verify(mockIn).close();
+ }
+}