You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/24 12:02:51 UTC

[1/3] flink git commit: [hotfix] [core] Fix lots of checkstyle errors in core.fs

Repository: flink
Updated Branches:
  refs/heads/release-1.4 3b58038d6 -> a11e2cf0b


[hotfix] [core] Fix lots of checkstyle errors in core.fs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5e156f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5e156f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5e156f7

Branch: refs/heads/release-1.4
Commit: b5e156f79ae7e9cd2f8d5008f0c350e10ad4a821
Parents: 3b58038
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 8 20:14:34 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 24 10:47:23 2017 +0100

----------------------------------------------------------------------
 .../core/fs/AbstractMultiFSDataInputStream.java |   8 +-
 .../org/apache/flink/core/fs/BlockLocation.java |   6 +-
 .../apache/flink/core/fs/CloseableRegistry.java |   8 +-
 .../flink/core/fs/ClosingFSDataInputStream.java |   4 +-
 .../core/fs/ClosingFSDataOutputStream.java      |   4 +-
 .../apache/flink/core/fs/FSDataInputStream.java |   6 +-
 .../flink/core/fs/FSDataInputStreamWrapper.java |   2 +-
 .../flink/core/fs/FSDataOutputStream.java       |  40 +++---
 .../core/fs/FSDataOutputStreamWrapper.java      |   2 +-
 .../apache/flink/core/fs/FileInputSplit.java    |  30 ++--
 .../org/apache/flink/core/fs/FileStatus.java    |  19 ++-
 .../org/apache/flink/core/fs/FileSystem.java    | 137 ++++++++++---------
 .../apache/flink/core/fs/FileSystemFactory.java |   2 +-
 .../java/org/apache/flink/core/fs/Path.java     |  59 ++++----
 .../flink/core/fs/UnsupportedSchemeFactory.java |   4 +-
 .../flink/core/fs/WrappingProxyCloseable.java   |   4 +-
 16 files changed, 163 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
index a161ceb..e01ac2e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/AbstractMultiFSDataInputStream.java
@@ -32,13 +32,13 @@ import java.io.IOException;
 @Internal
 public abstract class AbstractMultiFSDataInputStream extends FSDataInputStream {
 
-	/** Inner stream for the currently accessed segment of the virtual global stream */
+	/** Inner stream for the currently accessed segment of the virtual global stream. */
 	protected FSDataInputStream delegate;
 
-	/** Position in the virtual global stream */
+	/** Position in the virtual global stream. */
 	protected long totalPos;
 
-	/** Total available bytes in the virtual global stream */
+	/** Total available bytes in the virtual global stream. */
 	protected long totalAvailable;
 
 	public AbstractMultiFSDataInputStream() {
@@ -48,7 +48,7 @@ public abstract class AbstractMultiFSDataInputStream extends FSDataInputStream {
 	@Override
 	public void seek(long desired) throws IOException {
 
-		if(desired == totalPos) {
+		if (desired == totalPos) {
 			return;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java b/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
index dff0c3e..fcdf905 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/BlockLocation.java
@@ -30,7 +30,7 @@ public interface BlockLocation extends Comparable<BlockLocation> {
 
 	/**
 	 * Get the list of hosts (hostname) hosting this block.
-	 * 
+	 *
 	 * @return A list of hosts (hostname) hosting this block.
 	 * @throws IOException
 	 *         thrown if the list of hosts could not be retrieved
@@ -39,14 +39,14 @@ public interface BlockLocation extends Comparable<BlockLocation> {
 
 	/**
 	 * Get the start offset of the file associated with this block.
-	 * 
+	 *
 	 * @return The start offset of the file associated with this block.
 	 */
 	long getOffset();
 
 	/**
 	 * Get the length of the block.
-	 * 
+	 *
 	 * @return the length of the block
 	 */
 	long getLength();

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 87d33d2..5f1c9fb 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -29,10 +29,10 @@ import java.util.Map;
 
 /**
  * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed.
- * <p>
- * Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
- * <p>
- * All methods in this class are thread-safe.
+ *
+ * <p>Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
+ *
+ * <p>All methods in this class are thread-safe.
  */
 @Internal
 public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> {

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
index 173a890..1a62f62 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
@@ -26,8 +26,8 @@ import java.io.IOException;
 /**
  * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataInputStream} that is used to
  * implement a safety net against unclosed streams.
- * <p>
- * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ *
+ * <p>See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
  */
 @Internal
 public class ClosingFSDataInputStream

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
index cb7de92..0f252a4 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
@@ -26,8 +26,8 @@ import java.io.IOException;
 /**
  * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataOutputStream} that is used to
  * implement a safety net against unclosed streams.
- * <p>
- * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
+ *
+ * <p>See {@link SafetyNetCloseableRegistry} for more details on how this is utilized.
  */
 @Internal
 public class ClosingFSDataOutputStream

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
index 44dbcb1..fa931c6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
@@ -25,7 +25,7 @@ import java.io.InputStream;
 
 /**
  * Interface for a data input stream to a file on a {@link FileSystem}.
- * 
+ *
  * <p>This extends the {@link java.io.InputStream} with methods for accessing
  * the stream's {@link #getPos() current position} and
  * {@link #seek(long) seeking} to a desired position.
@@ -36,7 +36,7 @@ public abstract class FSDataInputStream extends InputStream {
 	/**
 	 * Seek to the given offset from the start of the file. The next read() will be from that location.
 	 * Can't seek past the end of the file.
-	 * 
+	 *
 	 * @param desired
 	 *        the desired offset
 	 * @throws IOException Thrown if an error occurred while seeking inside the input stream.
@@ -47,7 +47,7 @@ public abstract class FSDataInputStream extends InputStream {
 	 * Gets the current position in the input stream.
 	 *
 	 * @return current position in the input stream
-	 * @throws IOException Thrown if an I/O error occurred in the underlying stream 
+	 * @throws IOException Thrown if an I/O error occurred in the underlying stream
 	 *                     implementation while accessing the stream's position.
 	 */
 	public abstract long getPos() throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
index d2eb9f2..6a3874f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java
@@ -25,7 +25,7 @@ import org.apache.flink.util.WrappingProxy;
 import java.io.IOException;
 
 /**
- * Simple forwarding wrapper around {@link FSDataInputStream}
+ * Simple forwarding wrapper around {@link FSDataInputStream}.
  */
 @Internal
 public class FSDataInputStreamWrapper extends FSDataInputStream implements WrappingProxy<FSDataInputStream> {

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
index a8df5c1..4a6e01d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
@@ -26,22 +26,22 @@ import java.io.OutputStream;
 /**
  * An output stream to a file that is created via a {@link FileSystem}.
  * This class extends the base {@link java.io.OutputStream} with some additional important methods.
- * 
+ *
  * <h2>Data Persistence Guarantees</h2>
- * 
- * These streams are used to persistently store data, both for results of streaming applications
+ *
+ * <p>These streams are used to persistently store data, both for results of streaming applications
  * and for fault tolerance and recovery. It is therefore crucial that the persistence semantics
  * of these streams are well defined.
- * 
+ *
  * <p>Please refer to the class-level docs of {@link FileSystem} for the definition of data persistence
  * via Flink's FileSystem abstraction and the {@code FSDataOutputStream}.
- * 
+ *
  * <h2>Thread Safety</h2>
- * 
- * Implementations of the {@code FSDataOutputStream} are generally not assumed to be thread safe.
+ *
+ * <p>Implementations of the {@code FSDataOutputStream} are generally not assumed to be thread safe.
  * Instances of {@code FSDataOutputStream} should not be passed between threads, because there
  * are no guarantees about the order of visibility of operations across threads.
- * 
+ *
  * @see FileSystem
  * @see FSDataInputStream
  */
@@ -52,13 +52,13 @@ public abstract class FSDataOutputStream extends OutputStream {
 	 * Gets the position of the stream (non-negative), defined as the number of bytes
 	 * from the beginning of the file to the current writing position. The position
 	 * corresponds to the zero-based index of the next byte that will be written.
-	 * 
+	 *
 	 * <p>This method must report accurately report the current position of the stream.
 	 * Various components of the high-availability and recovery logic rely on the accurate
-	 * 
+	 *
 	 * @return The current position in the stream, defined as the number of bytes
 	 *         from the beginning of the file to the current writing position.
-	 * 
+	 *
 	 * @throws IOException Thrown if an I/O error occurs while obtaining the position from
 	 *                     the stream implementation.
 	 */
@@ -68,14 +68,14 @@ public abstract class FSDataOutputStream extends OutputStream {
 	 * Flushes the stream, writing any data currently buffered in stream implementation
 	 * to the proper output stream. After this method has been called, the stream implementation
 	 * must not hold onto any buffered data any more.
-	 * 
+	 *
 	 * <p>A completed flush does not mean that the data is necessarily persistent. Data
 	 * persistence can is only assumed after calls to {@link #close()} or {@link #sync()}.
-	 * 
+	 *
 	 * <p>Implementation note: This overrides the method defined in {@link OutputStream}
 	 * as abstract to force implementations of the {@code FSDataOutputStream} to implement
 	 * this method directly.
-	 * 
+	 *
 	 * @throws IOException Thrown if an I/O error occurs while flushing the stream.
 	 */
 	public abstract void flush() throws IOException;
@@ -84,9 +84,7 @@ public abstract class FSDataOutputStream extends OutputStream {
 	 * Flushes the data all the way to the persistent non-volatile storage (for example disks).
 	 * The method behaves similar to the <i>fsync</i> function, forcing all data to
 	 * be persistent on the devices.
-	 * 
-	 * <p>
-	 * 
+	 *
 	 * @throws IOException Thrown if an I/O error occurs
 	 */
 	public abstract void sync() throws IOException;
@@ -95,20 +93,20 @@ public abstract class FSDataOutputStream extends OutputStream {
 	 * Closes the output stream. After this method returns, the implementation must guarantee
 	 * that all data written to the stream is persistent/visible, as defined in the
 	 * {@link FileSystem class-level docs}.
-	 * 
+	 *
 	 * <p>The above implies that the method must block until persistence can be guaranteed.
 	 * For example for distributed replicated file systems, the method must block until the
 	 * replication quorum has been reached. If the calling thread is interrupted in the
 	 * process, it must fail with an {@code IOException} to indicate that persistence cannot
 	 * be guaranteed.
-	 * 
+	 *
 	 * <p>If this method throws an exception, the data in the stream cannot be assumed to be
 	 * persistent.
-	 * 
+	 *
 	 * <p>Implementation note: This overrides the method defined in {@link OutputStream}
 	 * as abstract to force implementations of the {@code FSDataOutputStream} to implement
 	 * this method directly.
-	 *         
+	 *
 	 * @throws IOException Thrown, if an error occurred while closing the stream or guaranteeing
 	 *                     that the data is persistent.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
index f015012..21e68ef 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java
@@ -25,7 +25,7 @@ import org.apache.flink.util.WrappingProxy;
 import java.io.IOException;
 
 /**
- * Simple forwarding wrapper around {@link FSDataInputStream}
+ * Simple forwarding wrapper around {@link FSDataInputStream}.
  */
 @Internal
 public class FSDataOutputStreamWrapper extends FSDataOutputStream implements WrappingProxy<FSDataOutputStream> {

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
index 8af0a20..bef13fa 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.io.LocatableInputSplit;
 
 /**
  * A file input split provides information on a particular part of a file, possibly
- * hosted on a distributed file system and replicated among several hosts. 
+ * hosted on a distributed file system and replicated among several hosts.
  */
 @Public
 public class FileInputSplit extends LocatableInputSplit {
@@ -34,16 +34,16 @@ public class FileInputSplit extends LocatableInputSplit {
 	private final Path file;
 
 	/** The position of the first byte in the file to process. */
-	private long start;
+	private final long start;
 
 	/** The number of bytes in the file to process. */
-	private long length;
+	private final long length;
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Constructs a split with host information.
-	 * 
+	 *
 	 * @param num
 	 *        the number of this input split
 	 * @param file
@@ -57,17 +57,17 @@ public class FileInputSplit extends LocatableInputSplit {
 	 */
 	public FileInputSplit(int num, Path file, long start, long length, String[] hosts) {
 		super(num, hosts);
-		
+
 		this.file = file;
 		this.start = start;
 		this.length = length;
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Returns the path of the file containing this split's data.
-	 * 
+	 *
 	 * @return the path of the file containing this split's data.
 	 */
 	public Path getPath() {
@@ -76,7 +76,7 @@ public class FileInputSplit extends LocatableInputSplit {
 
 	/**
 	 * Returns the position of the first byte in the file to process.
-	 * 
+	 *
 	 * @return the position of the first byte in the file to process
 	 */
 	public long getStart() {
@@ -85,20 +85,20 @@ public class FileInputSplit extends LocatableInputSplit {
 
 	/**
 	 * Returns the number of bytes in the file to process.
-	 * 
+	 *
 	 * @return the number of bytes in the file to process
 	 */
 	public long getLength() {
 		return length;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public int hashCode() {
 		return getSplitNumber() ^ (file == null ? 0 : file.hashCode());
 	}
-	
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj == this) {
@@ -106,7 +106,7 @@ public class FileInputSplit extends LocatableInputSplit {
 		}
 		else if (obj != null && obj instanceof FileInputSplit && super.equals(obj)) {
 			FileInputSplit other = (FileInputSplit) obj;
-			
+
 			return this.start == other.start &&
 					this.length == other.length &&
 					(this.file == null ? other.file == null : (other.file != null && this.file.equals(other.file)));
@@ -115,7 +115,7 @@ public class FileInputSplit extends LocatableInputSplit {
 			return false;
 		}
 	}
-	
+
 	@Override
 	public String toString() {
 		return "[" + getSplitNumber() + "] " + file + ":" + start + "+" + length;

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java
index 8b62659..f9794e6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java
@@ -20,7 +20,7 @@
 /**
  * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership. 
+ * additional information regarding copyright ownership.
  */
 
 package org.apache.flink.core.fs;
@@ -30,56 +30,55 @@ import org.apache.flink.annotation.Public;
 /**
  * Interface that represents the client side information for a file
  * independent of the file system.
- * 
  */
 @Public
 public interface FileStatus {
 
 	/**
-	 * Return the length of this file
-	 * 
+	 * Return the length of this file.
+	 *
 	 * @return the length of this file
 	 */
 	long getLen();
 
 	/**
 	 *Get the block size of the file.
-	 * 
+	 *
 	 * @return the number of bytes
 	 */
 	long getBlockSize();
 
 	/**
 	 * Get the replication factor of a file.
-	 * 
+	 *
 	 * @return the replication factor of a file.
 	 */
 	short getReplication();
 
 	/**
 	 * Get the modification time of the file.
-	 * 
+	 *
 	 * @return the modification time of file in milliseconds since January 1, 1970 UTC.
 	 */
 	long getModificationTime();
 
 	/**
 	 * Get the access time of the file.
-	 * 
+	 *
 	 * @return the access time of file in milliseconds since January 1, 1970 UTC.
 	 */
 	long getAccessTime();
 
 	/**
 	 * Checks if this object represents a directory.
-	 * 
+	 *
 	 * @return <code>true</code> if this is a directory, <code>false</code> otherwise
 	 */
 	boolean isDir();
 
 	/**
 	 * Returns the corresponding Path to the FileStatus.
-	 * 
+	 *
 	 * @return the corresponding Path to the FileStatus
 	 */
 	Path getPath();

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 982e496..7a8245a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 /*
  * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -38,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -55,40 +55,40 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * distributed file systems, or local file systems. The abstraction by this file system is very simple,
  * and the set of available operations quite limited, to support the common denominator of a wide
  * range of file systems. For example, appending to or mutating existing files is not supported.
- * 
+ *
  * <p>Flink implements and supports some file system types directly (for example the default
  * machine-local file system). Other file system types are accessed by an implementation that bridges
  * to the suite of file systems supported by Hadoop (such as for example HDFS).
- * 
+ *
  * <h2>Scope and Purpose</h2>
- * 
- * The purpose of this abstraction is used to expose a common and well defined interface for
+ *
+ * <p>The purpose of this abstraction is used to expose a common and well defined interface for
  * access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing
  * state and recovery data) and by reusable built-in connectors (file sources / sinks).
- * 
+ *
  * <p>The purpose of this abstraction is <b>not</b> to give user programs an abstraction with
  * extreme flexibility and control across all possible file systems. That mission would be a folly,
  * as the differences in characteristics of even the most common file systems are already quite
  * large. It is expected that user programs that need specialized functionality of certain file systems
  * in their functions, operations, sources, or sinks instantiate the specialized file system adapters
  * directly.
- * 
+ *
  * <h2>Data Persistence Contract</h2>
- * 
- * The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data,
+ *
+ * <p>The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data,
  * both for results of streaming applications and for fault tolerance and recovery. It is therefore
  * crucial that the persistence semantics of these streams are well defined.
- * 
+ *
  * <h3>Definition of Persistence Guarantees</h3>
- * 
- * Data written to an output stream is considered persistent, if two requirements are met:
- * 
+ *
+ * <p>Data written to an output stream is considered persistent, if two requirements are met:
+ *
  * <ol>
  *     <li><b>Visibility Requirement:</b> It must be guaranteed that all other processes, machines,
  *     virtual machines, containers, etc. that are able to access the file see the data consistently
  *     when given the absolute file path. This requirement is similar to the <i>close-to-open</i>
  *     semantics defined by POSIX, but restricted to the file itself (by its absolute path).</li>
- * 
+ *
  *     <li><b>Durability Requirement:</b> The file system's specific durability/persistence requirements
  *     must be met. These are specific to the particular file system. For example the
  *     {@link LocalFileSystem} does not provide any durability guarantees for crashes of both
@@ -101,14 +101,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * listing the directory contents) are not required to be complete for the data in the file stream
  * to be considered persistent. This relaxation is important for file systems where updates to
  * directory contents are only eventually consistent.
- * 
+ *
  * <p>The {@link FSDataOutputStream} has to guarantee data persistence for the written bytes
  * once the call to {@link FSDataOutputStream#close()} returns.
  *
  * <h3>Examples</h3>
  *
  * <ul>
- *     <li>For <b>fault-tolerant distributed file systems</b>, data is considered persistent once 
+ *     <li>For <b>fault-tolerant distributed file systems</b>, data is considered persistent once
  *     it has been received and acknowledged by the file system, typically by having been replicated
  *     to a quorum of machines (<i>durability requirement</i>). In addition the absolute file path
  *     must be visible to all other machines that will potentially access the file (<i>visibility
@@ -125,12 +125,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>A <b>local file system</b> must support the POSIX <i>close-to-open</i> semantics.
  *     Because the local file system does not have any fault tolerance guarantees, no further
  *     requirements exist.
- * 
+ *
  *     <p>The above implies specifically that data may still be in the OS cache when considered
  *     persistent from the local file system's perspective. Crashes that cause the OS cache to loose
  *     data are considered fatal to the local machine and are not covered by the local file system's
  *     guarantees as defined by Flink.
- * 
+ *
  *     <p>That means that computed results, checkpoints, and savepoints that are written only to
  *     the local filesystem are not guaranteed to be recoverable from the local machine's failure,
  *     making local file systems unsuitable for production setups.</li>
@@ -138,14 +138,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <h2>Updating File Contents</h2>
  *
- * Many file systems either do not support overwriting contents of existing files at all, or do
+ * <p>Many file systems either do not support overwriting contents of existing files at all, or do
  * not support consistent visibility of the updated contents in that case. For that reason,
  * Flink's FileSystem does not support appending to existing files, or seeking within output streams
  * so that previously written data could be overwritten.
  *
  * <h2>Overwriting Files</h2>
  *
- * Overwriting files is in general possible. A file is overwritten by deleting it and creating
+ * <p>Overwriting files is in general possible. A file is overwritten by deleting it and creating
  * a new file. However, certain filesystems cannot make that change synchronously visible
  * to all parties that have access to the file.
  * For example <a href="https://aws.amazon.com/documentation/s3/">Amazon S3</a> guarantees only
@@ -154,29 +154,29 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>To avoid these consistency issues, the implementations of failure/recovery mechanisms in
  * Flink strictly avoid writing to the same file path more than once.
- * 
+ *
  * <h2>Thread Safety</h2>
- * 
- * Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem
+ *
+ * <p>Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem
  * is frequently shared across multiple threads in Flink and must be able to concurrently
  * create input/output streams and list file metadata.
- * 
+ *
  * <p>The {@link FSDataOutputStream} and {@link FSDataOutputStream} implementations are strictly
  * <b>not thread-safe</b>. Instances of the streams should also not be passed between threads
  * in between read or write operations, because there are no guarantees about the visibility of
  * operations across threads (many operations do not create memory fences).
- * 
+ *
  * <h2>Streams Safety Net</h2>
- * 
- * When application code obtains a FileSystem (via {@link FileSystem#get(URI)} or via
+ *
+ * <p>When application code obtains a FileSystem (via {@link FileSystem#get(URI)} or via
  * {@link Path#getFileSystem()}), the FileSystem instantiates a safety net for that FileSystem.
  * The safety net ensures that all streams created from the FileSystem are closed when the
  * application task finishes (or is canceled or failed). That way, the task's threads do not
  * leak connections.
- * 
+ *
  * <p>Internal runtime code can explicitly obtain a FileSystem that does not use the safety
  * net via {@link FileSystem#getUnguardedFileSystem(URI)}.
- * 
+ *
  * @see FSDataInputStream
  * @see FSDataOutputStream
  */
@@ -201,7 +201,7 @@ public abstract class FileSystem {
 
 	// ------------------------------------------------------------------------
 
-	/** Logger for all FileSystem work */
+	/** Logger for all FileSystem work. */
 	private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
 
 	/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
@@ -223,14 +223,13 @@ public abstract class FileSystem {
 	/** The default filesystem scheme to be used, configured during process-wide initialization.
 	 * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
 	private static URI DEFAULT_SCHEME;
-	
 
 	// ------------------------------------------------------------------------
 	//  Initialization
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Initializes the shared file system settings. 
+	 * Initializes the shared file system settings.
 	 *
 	 * <p>The given configuration is passed to each file system factory to initialize the respective
 	 * file systems. Because the configuration of file systems may be different subsequent to the call
@@ -351,7 +350,7 @@ public abstract class FileSystem {
 				}
 			}
 
-			// print a helpful pointer for malformed local URIs (happens a lot to new users) 
+			// print a helpful pointer for malformed local URIs (happens a lot to new users)
 			if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
 				String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();
 
@@ -382,7 +381,7 @@ public abstract class FileSystem {
 				}
 				catch (UnsupportedFileSystemSchemeException e) {
 					throw new UnsupportedFileSystemSchemeException(
-							"Could not find a file system implementation for scheme '" + uri.getScheme() + 
+							"Could not find a file system implementation for scheme '" + uri.getScheme() +
 									"'. The scheme is not directly supported by Flink and no Hadoop file " +
 									"system to support this scheme could be loaded.", e);
 				}
@@ -479,7 +478,7 @@ public abstract class FileSystem {
 	 * Return the number of bytes that large input files should be optimally be split into to minimize I/O time.
 	 *
 	 * @return the number of bytes that large input files should be optimally be split into to minimize I/O time
-	 * 
+	 *
 	 * @deprecated This value is no longer used and is meaningless.
 	 */
 	@Deprecated
@@ -539,7 +538,7 @@ public abstract class FileSystem {
 
 	/**
 	 * Opens an FSDataOutputStream at the indicated Path.
-	 * 
+	 *
 	 * <p>This method is deprecated, because most of its parameters are ignored by most file systems.
 	 * To control for example the replication factor and block size in the Hadoop Distributed File system,
 	 * make sure that the respective Hadoop configuration file is either linked from the Flink configuration,
@@ -556,13 +555,13 @@ public abstract class FileSystem {
 	 *        required block replication for the file.
 	 * @param blockSize
 	 *        the size of the file blocks
-	 * 
+	 *
 	 * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
 	 *                     a file already exists at that path and the write mode indicates to not
 	 *                     overwrite the file.
-	 * 
+	 *
 	 * @deprecated Deprecated because not well supported across types of file systems.
-	 *             Control the behavior of specific file systems via configurations instead. 
+	 *             Control the behavior of specific file systems via configurations instead.
 	 */
 	@Deprecated
 	public FSDataOutputStream create(
@@ -583,11 +582,11 @@ public abstract class FileSystem {
 	 * @param overwrite
 	 *        if a file with this name already exists, then if true,
 	 *        the file will be overwritten, and if false an error will be thrown.
-	 * 
+	 *
 	 * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
 	 *                     a file already exists at that path and the write mode indicates to not
 	 *                     overwrite the file.
-	 * 
+	 *
 	 * @deprecated Use {@link #create(Path, WriteMode)} instead.
 	 */
 	@Deprecated
@@ -597,7 +596,7 @@ public abstract class FileSystem {
 
 	/**
 	 * Opens an FSDataOutputStream to a new file at the given path.
-	 * 
+	 *
 	 * <p>If the file already exists, the behavior depends on the given {@code WriteMode}.
 	 * If the mode is set to {@link WriteMode#NO_OVERWRITE}, then this method fails with an
 	 * exception.
@@ -605,7 +604,7 @@ public abstract class FileSystem {
 	 * @param f The file path to write to
 	 * @param overwriteMode The action to take if a file or directory already exists at the given path.
 	 * @return The stream to the new file at the target path.
-	 * 
+	 *
 	 * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
 	 *                     a file already exists at that path and the write mode indicates to not
 	 *                     overwrite the file.
@@ -674,7 +673,7 @@ public abstract class FileSystem {
 	 *     </ul>
 	 *   </li>
 	 * </ul>
-	 * 
+	 *
 	 * <p>Files contained in an existing directory are not deleted, because multiple instances of a
 	 * DataSinkTask might call this function at the same time and hence might perform concurrent
 	 * delete operations on the file system (possibly deleting output files of concurrently running tasks).
@@ -684,7 +683,7 @@ public abstract class FileSystem {
 	 * @param outPath Output path that should be prepared.
 	 * @param writeMode Write mode to consider.
 	 * @param createDirectory True, to initialize a directory at the given path, false to prepare space for a file.
-	 *    
+	 *
 	 * @return True, if the path was successfully prepared, false otherwise.
 	 * @throws IOException Thrown, if any of the file system access operations failed.
 	 */
@@ -708,7 +707,7 @@ public abstract class FileSystem {
 			// restore the interruption state
 			Thread.currentThread().interrupt();
 
-			// leave the method - we don't have the lock anyways 
+			// leave the method - we don't have the lock anyways
 			throw new IOException("The thread was interrupted while trying to initialize the output directory");
 		}
 
@@ -733,7 +732,7 @@ public abstract class FileSystem {
 					} else {
 						// file may not be overwritten
 						throw new IOException("File or directory already exists. Existing files and directories " +
-								"are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " + 
+								"are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
 								WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
 					}
 
@@ -748,7 +747,7 @@ public abstract class FileSystem {
 								delete(outPath, true);
 							}
 							catch (IOException e) {
-								throw new IOException("Could not remove existing directory '" + outPath + 
+								throw new IOException("Could not remove existing directory '" + outPath +
 										"' to allow overwrite by result file", e);
 							}
 						}
@@ -798,27 +797,27 @@ public abstract class FileSystem {
 	/**
 	 * Initializes output directories on distributed file systems according to the given write mode.
 	 *
-	 * WriteMode.NO_OVERWRITE &amp; parallel output:
+	 * <p>WriteMode.NO_OVERWRITE &amp; parallel output:
 	 *  - A directory is created if the output path does not exist.
 	 *  - An existing file or directory raises an exception.
 	 *
-	 * WriteMode.NO_OVERWRITE &amp; NONE parallel output:
+	 * <p>WriteMode.NO_OVERWRITE &amp; NONE parallel output:
 	 *  - An existing file or directory raises an exception.
 	 *
-	 * WriteMode.OVERWRITE &amp; parallel output:
+	 * <p>WriteMode.OVERWRITE &amp; parallel output:
 	 *  - A directory is created if the output path does not exist.
 	 *  - An existing directory and its content is deleted and a new directory is created.
 	 *  - An existing file is deleted and replaced by a new directory.
 	 *
-	 *  WriteMode.OVERWRITE &amp; NONE parallel output:
+	 *  <p>WriteMode.OVERWRITE &amp; NONE parallel output:
 	 *  - An existing file or directory is deleted and replaced by a new directory.
 	 *
 	 * @param outPath Output path that should be prepared.
 	 * @param writeMode Write mode to consider.
 	 * @param createDirectory True, to initialize a directory at the given path, false otherwise.
-	 *    
+	 *
 	 * @return True, if the path was successfully prepared, false otherwise.
-	 * 
+	 *
 	 * @throws IOException Thrown, if any of the file system access operations failed.
 	 */
 	public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
@@ -841,7 +840,7 @@ public abstract class FileSystem {
 			// restore the interruption state
 			Thread.currentThread().interrupt();
 
-			// leave the method - we don't have the lock anyways 
+			// leave the method - we don't have the lock anyways
 			throw new IOException("The thread was interrupted while trying to initialize the output directory");
 		}
 
@@ -850,13 +849,13 @@ public abstract class FileSystem {
 			if (exists(outPath)) {
 				// path exists, check write mode
 				switch(writeMode) {
-	
+
 				case NO_OVERWRITE:
 					// file or directory may not be overwritten
 					throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
 							WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
 								" mode to overwrite existing files and directories.");
-	
+
 				case OVERWRITE:
 					// output path exists. We delete it and all contained files in case of a directory.
 					try {
@@ -867,12 +866,12 @@ public abstract class FileSystem {
 						// this will be handled later.
 					}
 					break;
-	
+
 				default:
-					throw new IllegalArgumentException("Invalid write mode: "+writeMode);
+					throw new IllegalArgumentException("Invalid write mode: " + writeMode);
 				}
 			}
-	
+
 			if (createDirectory) {
 				// Output directory needs to be created
 				try {
@@ -881,10 +880,10 @@ public abstract class FileSystem {
 					}
 				} catch (IOException ioe) {
 					// Some other thread might already have created the directory.
-					// If - for some other reason - the directory could not be created  
+					// If - for some other reason - the directory could not be created
 					// and the path does not exist, this will be handled later.
 				}
-	
+
 				// double check that the output directory exists
 				return exists(outPath) && getFileStatus(outPath).isDir();
 			}
@@ -906,7 +905,7 @@ public abstract class FileSystem {
 	 * Aside from the {@link LocalFileSystem}, these file systems are loaded
 	 * via Java's service framework.
 	 *
-	 * @return A map from the file system scheme to corresponding file system factory. 
+	 * @return A map from the file system scheme to corresponding file system factory.
 	 */
 	private static HashMap<String, FileSystemFactory> loadFileSystems() {
 		final HashMap<String, FileSystemFactory> map = new HashMap<>();
@@ -935,7 +934,7 @@ public abstract class FileSystem {
 					// catching Throwable here to handle various forms of class loading
 					// and initialization errors
 					ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-					LOG.error("Failed to load a file systems via services", t);
+					LOG.error("Failed to load a file system via services", t);
 				}
 			}
 		}
@@ -948,7 +947,7 @@ public abstract class FileSystem {
 
 		return map;
 	}
-	
+
 	/**
 	 * Utility loader for the Hadoop file system factory.
 	 * We treat the Hadoop FS factory in a special way, because we use it as a catch
@@ -963,10 +962,12 @@ public abstract class FileSystem {
 		// first, see if the Flink runtime classes are available
 		final Class<? extends FileSystemFactory> factoryClass;
 		try {
-			factoryClass = Class.forName("org.apache.flink.runtime.fs.hdfs.HadoopFsFactory", false, cl).asSubclass(FileSystemFactory.class);
+			factoryClass = Class
+					.forName("org.apache.flink.runtime.fs.hdfs.HadoopFsFactory", false, cl)
+					.asSubclass(FileSystemFactory.class);
 		}
 		catch (ClassNotFoundException e) {
-			LOG.info("No Flink runtime dependency present. " + 
+			LOG.info("No Flink runtime dependency present. " +
 					"The extended set of supported File Systems via Hadoop is not available.");
 			return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies.");
 		}
@@ -1039,7 +1040,7 @@ public abstract class FileSystem {
 
 		@Override
 		public int hashCode() {
-			return 31 * scheme.hashCode() + 
+			return 31 * scheme.hashCode() +
 					(authority == null ? 17 : authority.hashCode());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
index 982da35..8a35471 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
@@ -58,4 +58,4 @@ public interface FileSystemFactory {
 	 * @throws IOException Thrown if the file system could not be instantiated.
 	 */
 	FileSystem create(URI fsUri) throws IOException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 53290ed..6398aa8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -22,26 +22,26 @@
 
 package org.apache.flink.core.fs;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.StringUtils;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+
 /**
  * Names a file or directory in a {@link FileSystem}. Path strings use slash as
  * the directory separator. A path string is absolute if it begins with a slash.
  *
- * Tailing slashes are removed from the path.
+ * <p>Tailing slashes are removed from the path.
  */
 @Public
 public class Path implements IOReadableWritable, Serializable {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	/**
@@ -71,7 +71,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Constructs a path object from a given URI.
-	 * 
+	 *
 	 * @param uri
 	 *        the URI to construct the path object from
 	 */
@@ -81,7 +81,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Resolve a child path against a parent path.
-	 * 
+	 *
 	 * @param parent
 	 *        the parent path
 	 * @param child
@@ -93,7 +93,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Resolve a child path against a parent path.
-	 * 
+	 *
 	 * @param parent
 	 *        the parent path
 	 * @param child
@@ -105,7 +105,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Resolve a child path against a parent path.
-	 * 
+	 *
 	 * @param parent
 	 *        the parent path
 	 * @param child
@@ -117,7 +117,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Resolve a child path against a parent path.
-	 * 
+	 *
 	 * @param parent
 	 *        the parent path
 	 * @param child
@@ -168,7 +168,7 @@ public class Path implements IOReadableWritable, Serializable {
 	/**
 	 * Construct a path from a String. Path strings are URIs, but with unescaped
 	 * elements and some additional normalization.
-	 * 
+	 *
 	 * @param pathString
 	 *        the string to construct a path from
 	 */
@@ -214,7 +214,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Construct a Path from a scheme, an authority and a path string.
-	 * 
+	 *
 	 * @param scheme
 	 *        the scheme string
 	 * @param authority
@@ -229,7 +229,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Initializes a path object given the scheme, authority and path string.
-	 * 
+	 *
 	 * @param scheme
 	 *        the scheme string.
 	 * @param authority
@@ -247,7 +247,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Normalizes a path string.
-	 * 
+	 *
 	 * @param path
 	 *        the path string to normalize
 	 * @return the normalized path string
@@ -262,10 +262,10 @@ public class Path implements IOReadableWritable, Serializable {
 		path = path.replaceAll("/+", "/");
 
 		// remove tailing separator
-		if(!path.equals(SEPARATOR) &&         		// UNIX root path
+		if (!path.equals(SEPARATOR) &&              // UNIX root path
 				!path.matches("/\\p{Alpha}+:/") &&  // Windows root path
-				path.endsWith(SEPARATOR))
-		{
+				path.endsWith(SEPARATOR)) {
+
 			// remove tailing slash
 			path = path.substring(0, path.length() - SEPARATOR.length());
 		}
@@ -275,7 +275,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Converts the path object to a {@link URI}.
-	 * 
+	 *
 	 * @return the {@link URI} object converted from the path object
 	 */
 	public URI toUri() {
@@ -284,7 +284,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Returns the FileSystem that owns this Path.
-	 * 
+	 *
 	 * @return the FileSystem that owns this Path
 	 * @throws IOException
 	 *         thrown if the file system could not be retrieved
@@ -295,7 +295,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Checks if the directory of this path is absolute.
-	 * 
+	 *
 	 * @return <code>true</code> if the directory of this path is absolute, <code>false</code> otherwise
 	 */
 	public boolean isAbsolute() {
@@ -305,7 +305,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Returns the final component of this path, i.e., everything that follows the last separator.
-	 * 
+	 *
 	 * @return the final component of the path
 	 */
 	public String getName() {
@@ -325,7 +325,7 @@ public class Path implements IOReadableWritable, Serializable {
 	/**
 	 * Returns the parent of a path, i.e., everything that precedes the last separator
 	 * or <code>null</code> if at root.
-	 * 
+	 *
 	 * @return the parent of a path or <code>null</code> if at root.
 	 */
 	public Path getParent() {
@@ -348,7 +348,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Adds a suffix to the final name in the path.
-	 * 
+	 *
 	 * @param suffix The suffix to be added
 	 * @return the new path including the suffix
 	 */
@@ -381,7 +381,6 @@ public class Path implements IOReadableWritable, Serializable {
 		return buffer.toString();
 	}
 
-
 	@Override
 	public boolean equals(Object o) {
 		if (!(o instanceof Path)) {
@@ -391,7 +390,6 @@ public class Path implements IOReadableWritable, Serializable {
 		return this.uri.equals(that.uri);
 	}
 
-
 	@Override
 	public int hashCode() {
 		return uri.hashCode();
@@ -404,7 +402,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Returns the number of elements in this path.
-	 * 
+	 *
 	 * @return the number of elements in this path
 	 */
 	public int depth() {
@@ -420,7 +418,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	/**
 	 * Returns a qualified path object.
-	 * 
+	 *
 	 * @param fs
 	 *        the FileSystem that should be used to obtain the current working directory
 	 * @return the qualified path object
@@ -479,7 +477,6 @@ public class Path implements IOReadableWritable, Serializable {
 		}
 	}
 
-
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		if (uri == null) {
@@ -516,7 +513,7 @@ public class Path implements IOReadableWritable, Serializable {
 	 *        the path to check
 	 * @param slashed
 	 *         true to indicate the first character of the string is a slash, false otherwise
-	 * 
+	 *
 	 * @return <code>true</code> if the path string contains a windows drive letter, false otherwise
 	 */
 	private boolean hasWindowsDrive(String path, boolean slashed) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
index 234b49f..c2cb2d5 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
@@ -20,11 +20,9 @@ package org.apache.flink.core.fs;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URI;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e156f7/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
index 9f100ef..e1c5a7d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java
@@ -27,6 +27,4 @@ import java.io.Closeable;
  * {@link WrappingProxy} for {@link Closeable} that is also closeable.
  */
 @Internal
-public interface WrappingProxyCloseable<T extends Closeable> extends Closeable, WrappingProxy<T> {
-
-}
+public interface WrappingProxyCloseable<T extends Closeable> extends Closeable, WrappingProxy<T> {}


[2/3] flink git commit: [FLINK-8125] [core] Introduce limiting of outgoing file system connections

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
new file mode 100644
index 0000000..509b4ae
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link LimitedConnectionsFileSystem}.
+ */
+public class LimitedConnectionsFileSystemTest {
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testConstructionNumericOverflow() {
+		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(),
+				Integer.MAX_VALUE,  // unlimited total
+				Integer.MAX_VALUE,  // limited outgoing
+				Integer.MAX_VALUE,  // unlimited incoming
+				Long.MAX_VALUE - 1, // long timeout, close to overflow
+				Long.MAX_VALUE - 1); // long timeout, close to overflow
+
+		assertEquals(Integer.MAX_VALUE, limitedFs.getMaxNumOpenStreamsTotal());
+		assertEquals(Integer.MAX_VALUE, limitedFs.getMaxNumOpenOutputStreams());
+		assertEquals(Integer.MAX_VALUE, limitedFs.getMaxNumOpenInputStreams());
+
+		assertTrue(limitedFs.getStreamOpenTimeout() > 0);
+		assertTrue(limitedFs.getStreamInactivityTimeout() > 0);
+	}
+
+	@Test
+	public void testLimitingOutputStreams() throws Exception {
+		final int maxConcurrentOpen = 2;
+		final int numThreads = 61;
+
+		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(),
+				Integer.MAX_VALUE,  // unlimited total
+				maxConcurrentOpen,  // limited outgoing
+				Integer.MAX_VALUE,  // unlimited incoming
+				0,
+				0);
+
+		final WriterThread[] threads = new WriterThread[numThreads];
+		for (int i = 0; i < numThreads; i++) {
+			Path path = new Path(tempFolder.newFile().toURI());
+			threads[i] = new WriterThread(limitedFs, path, maxConcurrentOpen, Integer.MAX_VALUE);
+		}
+
+		for (WriterThread t : threads) {
+			t.start();
+		}
+
+		for (WriterThread t : threads) {
+			t.sync();
+		}
+	}
+
+	@Test
+	public void testLimitingInputStreams() throws Exception {
+		final int maxConcurrentOpen = 2;
+		final int numThreads = 61;
+
+		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(),
+				Integer.MAX_VALUE,  // unlimited total
+				Integer.MAX_VALUE,  // unlimited outgoing
+				maxConcurrentOpen,  // limited incoming
+				0,
+				0);
+
+		final Random rnd = new Random();
+
+		final ReaderThread[] threads = new ReaderThread[numThreads];
+		for (int i = 0; i < numThreads; i++) {
+			File file = tempFolder.newFile();
+			createRandomContents(file, rnd);
+			Path path = new Path(file.toURI());
+			threads[i] = new ReaderThread(limitedFs, path, maxConcurrentOpen, Integer.MAX_VALUE);
+		}
+
+		for (ReaderThread t : threads) {
+			t.start();
+		}
+
+		for (ReaderThread t : threads) {
+			t.sync();
+		}
+	}
+
+	@Test
+	public void testLimitingMixedStreams() throws Exception {
+		final int maxConcurrentOpen = 2;
+		final int numThreads = 61;
+
+		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(),
+				maxConcurrentOpen);  // limited total
+
+		final Random rnd = new Random();
+
+		final CheckedThread[] threads = new CheckedThread[numThreads];
+		for (int i = 0; i < numThreads; i++) {
+			File file = tempFolder.newFile();
+			Path path = new Path(file.toURI());
+
+			if (rnd.nextBoolean()) {
+				// reader thread
+				createRandomContents(file, rnd);
+				threads[i] = new ReaderThread(limitedFs, path, Integer.MAX_VALUE, maxConcurrentOpen);
+			}
+			else {
+				threads[i] = new WriterThread(limitedFs, path, Integer.MAX_VALUE, maxConcurrentOpen);
+			}
+		}
+
+		for (CheckedThread t : threads) {
+			t.start();
+		}
+
+		for (CheckedThread t : threads) {
+			t.sync();
+		}
+	}
+
+	@Test
+	public void testOpenTimeoutOutputStreams() throws Exception {
+		final long openTimeout = 50L;
+		final int maxConcurrentOpen = 2;
+
+		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(),
+				maxConcurrentOpen, // limited total
+				openTimeout,       // small opening timeout
+				0L);               // infinite inactivity timeout
+
+		// create the threads that block all streams
+		final BlockingWriterThread[] threads = new BlockingWriterThread[maxConcurrentOpen];
+		for (int i = 0; i < maxConcurrentOpen; i++) {
+			Path path = new Path(tempFolder.newFile().toURI());
+			threads[i] = new BlockingWriterThread(limitedFs, path, Integer.MAX_VALUE, maxConcurrentOpen);
+			threads[i].start();
+		}
+
+		// wait until all are open
+		while (limitedFs.getTotalNumberOfOpenStreams() < maxConcurrentOpen) {
+			Thread.sleep(1);
+		}
+
+		// try to open another thread
+		try {
+			limitedFs.create(new Path(tempFolder.newFile().toURI()), WriteMode.OVERWRITE);
+			fail("this should have timed out");
+		}
+		catch (IOException e) {
+			// expected
+		}
+
+		// clean shutdown
+		for (BlockingWriterThread t : threads) {
+			t.wakeup();
+			t.sync();
+		}
+	}
+
+	@Test
+	public void testOpenTimeoutInputStreams() throws Exception {
+		final long openTimeout = 50L;
+		final int maxConcurrentOpen = 2;
+
+		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(),
+				maxConcurrentOpen, // limited total
+				openTimeout,       // small opening timeout
+				0L);               // infinite inactivity timeout
+
+		// create the threads that block all streams
+		final Random rnd = new Random();
+		final BlockingReaderThread[] threads = new BlockingReaderThread[maxConcurrentOpen];
+		for (int i = 0; i < maxConcurrentOpen; i++) {
+			File file = tempFolder.newFile();
+			createRandomContents(file, rnd);
+			Path path = new Path(file.toURI());
+			threads[i] = new BlockingReaderThread(limitedFs, path, maxConcurrentOpen, Integer.MAX_VALUE);
+			threads[i].start();
+		}
+
+		// wait until all are open
+		while (limitedFs.getTotalNumberOfOpenStreams() < maxConcurrentOpen) {
+			Thread.sleep(1);
+		}
+
+		// try to open another thread
+		File file = tempFolder.newFile();
+		createRandomContents(file, rnd);
+		try {
+			limitedFs.open(new Path(file.toURI()));
+			fail("this should have timed out");
+		}
+		catch (IOException e) {
+			// expected
+		}
+
+		// clean shutdown
+		for (BlockingReaderThread t : threads) {
+			t.wakeup();
+			t.sync();
+		}
+	}
+
+	@Test
+	public void testTerminateStalledOutputStreams() throws Exception {
+		final int maxConcurrentOpen = 2;
+		final int numThreads = 20;
+
+		// this testing file system has a 50 ms stream inactivity timeout
+		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(),
+				Integer.MAX_VALUE,  // no limit on total streams
+				maxConcurrentOpen,  // limit on output streams
+				Integer.MAX_VALUE,  // no limit on input streams
+				0,
+				50);               // timeout of 50 ms
+
+		final WriterThread[] threads = new WriterThread[numThreads];
+		final BlockingWriterThread[] blockers = new BlockingWriterThread[numThreads];
+
+		for (int i = 0; i < numThreads; i++) {
+			Path path1 = new Path(tempFolder.newFile().toURI());
+			Path path2 = new Path(tempFolder.newFile().toURI());
+
+			threads[i] = new WriterThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+			blockers[i] = new BlockingWriterThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+		}
+
+		// start normal and blocker threads
+		for (int i = 0; i < numThreads; i++) {
+			blockers[i].start();
+			threads[i].start();
+		}
+
+		// all normal threads need to be able to finish because
+		// the blockers eventually time out
+		for (WriterThread t : threads) {
+			try {
+				t.sync();
+			} catch (LimitedConnectionsFileSystem.StreamTimeoutException e) {
+				// also the regular threads may occasionally get a timeout on
+				// slower test machines because we set very aggressive timeouts
+				// to reduce the test time
+			}
+		}
+
+		// unblock all the blocking threads
+		for (BlockingThread t : blockers) {
+			t.wakeup();
+		}
+		for (BlockingThread t : blockers) {
+			try {
+				t.sync();
+			}
+			catch (LimitedConnectionsFileSystem.StreamTimeoutException ignored) {}
+		}
+	}
+
+	@Test
+	public void testTerminateStalledInputStreams() throws Exception {
+		final int maxConcurrentOpen = 2;
+		final int numThreads = 20;
+
+		// this testing file system has a 50 ms stream inactivity timeout
+		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(),
+				Integer.MAX_VALUE,  // no limit on total streams
+				Integer.MAX_VALUE,  // limit on output streams
+				maxConcurrentOpen,  // no limit on input streams
+				0,
+				50);               // timeout of 50 ms
+
+		final Random rnd = new Random();
+
+		final ReaderThread[] threads = new ReaderThread[numThreads];
+		final BlockingReaderThread[] blockers = new BlockingReaderThread[numThreads];
+
+		for (int i = 0; i < numThreads; i++) {
+			File file1 = tempFolder.newFile();
+			File file2 = tempFolder.newFile();
+
+			createRandomContents(file1, rnd);
+			createRandomContents(file2, rnd);
+
+			Path path1 = new Path(file1.toURI());
+			Path path2 = new Path(file2.toURI());
+
+			threads[i] = new ReaderThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+			blockers[i] = new BlockingReaderThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+		}
+
+		// start normal and blocker threads
+		for (int i = 0; i < numThreads; i++) {
+			blockers[i].start();
+			threads[i].start();
+		}
+
+		// all normal threads need to be able to finish because
+		// the blockers eventually time out
+		for (ReaderThread t : threads) {
+			try {
+				t.sync();
+			} catch (LimitedConnectionsFileSystem.StreamTimeoutException e) {
+				// also the regular threads may occasionally get a timeout on
+				// slower test machines because we set very aggressive timeouts
+				// to reduce the test time
+			}
+		}
+
+		// unblock all the blocking threads
+		for (BlockingThread t : blockers) {
+			t.wakeup();
+		}
+		for (BlockingThread t : blockers) {
+			try {
+				t.sync();
+			}
+			catch (LimitedConnectionsFileSystem.StreamTimeoutException ignored) {}
+		}
+	}
+
+	@Test
+	public void testTerminateStalledMixedStreams() throws Exception {
+		final int maxConcurrentOpen = 2;
+		final int numThreads = 20;
+
+		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(),
+				maxConcurrentOpen,  // limited total
+				0L,                 // no opening timeout
+				50L);               // inactivity timeout of 50 ms
+
+		final Random rnd = new Random();
+
+		final CheckedThread[] threads = new CheckedThread[numThreads];
+		final BlockingThread[] blockers = new BlockingThread[numThreads];
+
+		for (int i = 0; i < numThreads; i++) {
+			File file1 = tempFolder.newFile();
+			File file2 = tempFolder.newFile();
+			Path path1 = new Path(file1.toURI());
+			Path path2 = new Path(file2.toURI());
+
+			if (rnd.nextBoolean()) {
+				createRandomContents(file1, rnd);
+				createRandomContents(file2, rnd);
+				threads[i] = new ReaderThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+				blockers[i] = new BlockingReaderThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+			}
+			else {
+				threads[i] = new WriterThread(limitedFs, path1, maxConcurrentOpen, Integer.MAX_VALUE);
+				blockers[i] = new BlockingWriterThread(limitedFs, path2, maxConcurrentOpen, Integer.MAX_VALUE);
+			}
+		}
+
+		// start normal and blocker threads
+		for (int i = 0; i < numThreads; i++) {
+			blockers[i].start();
+			threads[i].start();
+		}
+
+		// all normal threads need to be able to finish because
+		// the blockers eventually time out
+		for (CheckedThread t : threads) {
+			try {
+				t.sync();
+			} catch (LimitedConnectionsFileSystem.StreamTimeoutException e) {
+				// also the regular threads may occasionally get a timeout on
+				// slower test machines because we set very aggressive timeouts
+				// to reduce the test time
+			}
+		}
+
+		// unblock all the blocking threads
+		for (BlockingThread t : blockers) {
+			t.wakeup();
+		}
+		for (BlockingThread t : blockers) {
+			try {
+				t.sync();
+			}
+			catch (LimitedConnectionsFileSystem.StreamTimeoutException ignored) {}
+		}
+	}
+
+	@Test
+	public void testFailingStreamsUnregister() throws Exception {
+		final LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem(new FailFs(), 1);
+
+		assertEquals(0, fs.getNumberOfOpenInputStreams());
+		assertEquals(0, fs.getNumberOfOpenOutputStreams());
+		assertEquals(0, fs.getTotalNumberOfOpenStreams());
+
+		try {
+			fs.open(new Path(tempFolder.newFile().toURI()));
+			fail("this is expected to fail with an exception");
+		} catch (IOException e) {
+			// expected
+		}
+
+		try {
+			fs.create(new Path(tempFolder.newFile().toURI()), WriteMode.NO_OVERWRITE);
+			fail("this is expected to fail with an exception");
+		} catch (IOException e) {
+			// expected
+		}
+
+		assertEquals(0, fs.getNumberOfOpenInputStreams());
+		assertEquals(0, fs.getNumberOfOpenOutputStreams());
+		assertEquals(0, fs.getTotalNumberOfOpenStreams());
+	}
+
+	/**
+	 * Tests that a slowly written output stream is not accidentally closed too aggressively, due to
+	 * a wrong initialization of the timestamps or bytes written that mark when the last progress was checked.
+	 */
+	@Test
+	public void testSlowOutputStreamNotClosed() throws Exception {
+		final LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(), 1, 0L, 1000L);
+
+		// some competing threads
+		final Random rnd = new Random();
+		final ReaderThread[] threads = new ReaderThread[10];
+		for (int i = 0; i < threads.length; i++) {
+			File file = tempFolder.newFile();
+			createRandomContents(file, rnd);
+			Path path = new Path(file.toURI());
+			threads[i] = new ReaderThread(fs, path, 1, Integer.MAX_VALUE);
+		}
+
+		// open the stream we test
+		try (FSDataOutputStream out = fs.create(new Path(tempFolder.newFile().toURI()), WriteMode.OVERWRITE)) {
+
+			// start the other threads that will try to shoot this stream down
+			for (ReaderThread t : threads) {
+				t.start();
+			}
+
+			// read the stream slowly.
+			Thread.sleep(5);
+			for (int bytesLeft = 50; bytesLeft > 0; bytesLeft--) {
+				out.write(bytesLeft);
+				Thread.sleep(5);
+			}
+		}
+
+		// wait for clean shutdown
+		for (ReaderThread t : threads) {
+			t.sync();
+		}
+	}
+
+	/**
+	 * Tests that a slowly read stream is not accidentally closed too aggressively, due to
+	 * a wrong initialization of the timestamps or bytes written that mark when the last progress was checked.
+	 */
+	@Test
+	public void testSlowInputStreamNotClosed() throws Exception {
+		final File file = tempFolder.newFile();
+		createRandomContents(file, new Random(), 50);
+
+		final LimitedConnectionsFileSystem fs = new LimitedConnectionsFileSystem(
+				LocalFileSystem.getSharedInstance(), 1, 0L, 1000L);
+
+		// some competing threads
+		final WriterThread[] threads = new WriterThread[10];
+		for (int i = 0; i < threads.length; i++) {
+			Path path = new Path(tempFolder.newFile().toURI());
+			threads[i] = new WriterThread(fs, path, 1, Integer.MAX_VALUE);
+		}
+
+		// open the stream we test
+		try (FSDataInputStream in = fs.open(new Path(file.toURI()))) {
+
+			// start the other threads that will try to shoot this stream down
+			for (WriterThread t : threads) {
+				t.start();
+			}
+
+			// read the stream slowly.
+			Thread.sleep(5);
+			while (in.read() != -1) {
+				Thread.sleep(5);
+			}
+		}
+
+		// wait for clean shutdown
+		for (WriterThread t : threads) {
+			t.sync();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
+
+	private void createRandomContents(File file, Random rnd) throws IOException {
+		createRandomContents(file, rnd, rnd.nextInt(10000) + 1);
+	}
+
+	private void createRandomContents(File file, Random rnd, int size) throws IOException {
+		final byte[] data = new byte[size];
+		rnd.nextBytes(data);
+
+		try (FileOutputStream fos = new FileOutputStream(file)) {
+			fos.write(data);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Testing threads
+	// ------------------------------------------------------------------------
+
+	private static final class WriterThread extends CheckedThread {
+
+		private final LimitedConnectionsFileSystem fs;
+
+		private final Path path;
+
+		private final int maxConcurrentOutputStreams;
+
+		private final int maxConcurrentStreamsTotal;
+
+		WriterThread(
+				LimitedConnectionsFileSystem fs,
+				Path path,
+				int maxConcurrentOutputStreams,
+				int maxConcurrentStreamsTotal) {
+
+			this.fs = fs;
+			this.path = path;
+			this.maxConcurrentOutputStreams = maxConcurrentOutputStreams;
+			this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+		}
+
+		@Override
+		public void go() throws Exception {
+
+			try (FSDataOutputStream stream = fs.create(path, WriteMode.OVERWRITE)) {
+				assertTrue(fs.getNumberOfOpenOutputStreams() <= maxConcurrentOutputStreams);
+				assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+				final Random rnd = new Random();
+				final byte[] data = new byte[rnd.nextInt(10000) + 1];
+				rnd.nextBytes(data);
+				stream.write(data);
+			}
+		}
+	}
+
+	private static final class ReaderThread extends CheckedThread {
+
+		private final LimitedConnectionsFileSystem fs;
+
+		private final Path path;
+
+		private final int maxConcurrentInputStreams;
+
+		private final int maxConcurrentStreamsTotal;
+
+		ReaderThread(
+				LimitedConnectionsFileSystem fs,
+				Path path,
+				int maxConcurrentInputStreams,
+				int maxConcurrentStreamsTotal) {
+
+			this.fs = fs;
+			this.path = path;
+			this.maxConcurrentInputStreams = maxConcurrentInputStreams;
+			this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+		}
+
+		@Override
+		public void go() throws Exception {
+
+			try (FSDataInputStream stream = fs.open(path)) {
+				assertTrue(fs.getNumberOfOpenInputStreams() <= maxConcurrentInputStreams);
+				assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+				final byte[] readBuffer = new byte[4096];
+
+				//noinspection StatementWithEmptyBody
+				while (stream.read(readBuffer) != -1) {}
+			}
+		}
+	}
+
+	private static abstract class BlockingThread extends CheckedThread {
+
+		private final OneShotLatch waiter = new OneShotLatch();
+
+		public void waitTillWokenUp() throws InterruptedException {
+			waiter.await();
+		}
+
+		public void wakeup() {
+			waiter.trigger();
+		}
+	}
+
+	private static final class BlockingWriterThread extends BlockingThread {
+
+		private final LimitedConnectionsFileSystem fs;
+
+		private final Path path;
+
+		private final int maxConcurrentOutputStreams;
+
+		private final int maxConcurrentStreamsTotal;
+
+		BlockingWriterThread(
+				LimitedConnectionsFileSystem fs,
+				Path path,
+				int maxConcurrentOutputStreams,
+				int maxConcurrentStreamsTotal) {
+
+			this.fs = fs;
+			this.path = path;
+			this.maxConcurrentOutputStreams = maxConcurrentOutputStreams;
+			this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+		}
+
+		@Override
+		public void go() throws Exception {
+
+			try (FSDataOutputStream stream = fs.create(path, WriteMode.OVERWRITE)) {
+				assertTrue(fs.getNumberOfOpenOutputStreams() <= maxConcurrentOutputStreams);
+				assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+				final Random rnd = new Random();
+				final byte[] data = new byte[rnd.nextInt(10000) + 1];
+				rnd.nextBytes(data);
+				stream.write(data);
+
+				waitTillWokenUp();
+
+				// try to write one more thing, which might/should fail with an I/O exception
+				stream.write(rnd.nextInt());
+			}
+		}
+	}
+
+	private static final class BlockingReaderThread extends BlockingThread {
+
+		private final LimitedConnectionsFileSystem fs;
+
+		private final Path path;
+
+		private final int maxConcurrentInputStreams;
+
+		private final int maxConcurrentStreamsTotal;
+
+		BlockingReaderThread(
+				LimitedConnectionsFileSystem fs,
+				Path path,
+				int maxConcurrentInputStreams,
+				int maxConcurrentStreamsTotal) {
+
+			this.fs = fs;
+			this.path = path;
+			this.maxConcurrentInputStreams = maxConcurrentInputStreams;
+			this.maxConcurrentStreamsTotal = maxConcurrentStreamsTotal;
+		}
+
+		@Override
+		public void go() throws Exception {
+
+			try (FSDataInputStream stream = fs.open(path)) {
+				assertTrue(fs.getNumberOfOpenInputStreams() <= maxConcurrentInputStreams);
+				assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
+
+				final byte[] readBuffer = new byte[(int) fs.getFileStatus(path).getLen() - 1];
+				assertTrue(stream.read(readBuffer) != -1);
+
+				waitTillWokenUp();
+
+				// try to write one more thing, which might/should fail with an I/O exception
+				//noinspection ResultOfMethodCallIgnored
+				stream.read();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  failing file system
+	// ------------------------------------------------------------------------
+
+	private static class FailFs extends LocalFileSystem {
+
+		@Override
+		public FSDataOutputStream create(Path filePath, WriteMode overwrite) throws IOException {
+			throw new IOException("test exception");
+		}
+
+		@Override
+		public FSDataInputStream open(Path f) throws IOException {
+			throw new IOException("test exception");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
index 50e64e1..2444c65 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
@@ -19,7 +19,10 @@
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem.ConnectionLimitingSettings;
 import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
 import org.apache.flink.runtime.util.HadoopUtils;
 
@@ -63,7 +66,7 @@ public class HadoopFsFactory implements FileSystemFactory {
 	}
 
 	@Override
-	public HadoopFileSystem create(URI fsUri) throws IOException {
+	public FileSystem create(URI fsUri) throws IOException {
 		checkNotNull(fsUri, "fsUri");
 
 		final String scheme = fsUri.getScheme();
@@ -162,8 +165,15 @@ public class HadoopFsFactory implements FileSystemFactory {
 				throw new IOException(message, e);
 			}
 
-			// all good, return the file system
-			return new HadoopFileSystem(hadoopFs);
+			HadoopFileSystem fs = new HadoopFileSystem(hadoopFs);
+
+			// create the Flink file system, optionally limiting the open connections
+			if (flinkConfig != null) {
+				return limitIfConfigured(fs, scheme, flinkConfig);
+			}
+			else {
+				return fs;
+			}
 		}
 		catch (ReflectiveOperationException | LinkageError e) {
 			throw new UnsupportedFileSystemSchemeException("Cannot support file system for '" + fsUri.getScheme() +
@@ -183,4 +193,23 @@ public class HadoopFsFactory implements FileSystemFactory {
 				"(like for example HDFS NameNode address/port or S3 host). " +
 				"The attempt to use a configured default authority failed: ";
 	}
+
+	private static FileSystem limitIfConfigured(HadoopFileSystem fs, String scheme, Configuration config) {
+		final ConnectionLimitingSettings limitSettings = ConnectionLimitingSettings.fromConfig(config, scheme);
+
+		// decorate only if any limit is configured
+		if (limitSettings == null) {
+			// no limit configured
+			return fs;
+		}
+		else {
+			return new LimitedConnectionsFileSystem(
+					fs,
+					limitSettings.limitTotal,
+					limitSettings.limitOutput,
+					limitSettings.limitInput,
+					limitSettings.streamOpenTimeout,
+					limitSettings.streamInactivityTimeout);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
index 1f5c932..4b7592d 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.fs.hdfs;
 
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -39,7 +40,7 @@ public class HadoopFsFactoryTest extends TestLogger {
 		final URI uri = URI.create("hdfs://localhost:12345/");
 
 		HadoopFsFactory factory = new HadoopFsFactory();
-		HadoopFileSystem fs = factory.create(uri);
+		FileSystem fs = factory.create(uri);
 
 		assertEquals(uri.getScheme(), fs.getUri().getScheme());
 		assertEquals(uri.getAuthority(), fs.getUri().getAuthority());

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
new file mode 100644
index 0000000..8ab5419
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test that the Hadoop file system wrapper correctly picks up connection limiting
+ * settings for the correct file systems.
+ */
+public class LimitedConnectionsConfigurationTest {
+
+	@Rule
+	public final TemporaryFolder tempDir = new TemporaryFolder();
+
+	@Test
+	public void testConfiguration() throws Exception {
+
+		// nothing configured, we should get a regular file system
+		FileSystem hdfs = FileSystem.get(URI.create("hdfs://localhost:12345/a/b/c"));
+		FileSystem ftpfs = FileSystem.get(URI.create("ftp://localhost:12345/a/b/c"));
+
+		assertFalse(hdfs instanceof LimitedConnectionsFileSystem);
+		assertFalse(ftpfs instanceof LimitedConnectionsFileSystem);
+
+		// configure some limits, which should cause "fsScheme" to be limited
+
+		final Configuration config = new Configuration();
+		config.setInteger("fs.hdfs.limit.total", 40);
+		config.setInteger("fs.hdfs.limit.input", 39);
+		config.setInteger("fs.hdfs.limit.output", 38);
+		config.setInteger("fs.hdfs.limit.timeout", 23456);
+		config.setInteger("fs.hdfs.limit.stream-timeout", 34567);
+
+		try {
+			FileSystem.initialize(config);
+
+			hdfs = FileSystem.get(URI.create("hdfs://localhost:12345/a/b/c"));
+			ftpfs = FileSystem.get(URI.create("ftp://localhost:12345/a/b/c"));
+
+			assertTrue(hdfs instanceof LimitedConnectionsFileSystem);
+			assertFalse(ftpfs instanceof LimitedConnectionsFileSystem);
+
+			LimitedConnectionsFileSystem limitedFs = (LimitedConnectionsFileSystem) hdfs;
+			assertEquals(40, limitedFs.getMaxNumOpenStreamsTotal());
+			assertEquals(39, limitedFs.getMaxNumOpenInputStreams());
+			assertEquals(38, limitedFs.getMaxNumOpenOutputStreams());
+			assertEquals(23456, limitedFs.getStreamOpenTimeout());
+			assertEquals(34567, limitedFs.getStreamInactivityTimeout());
+		}
+		finally {
+			// clear all settings
+			FileSystem.initialize(new Configuration());
+		}
+	}
+}


[3/3] flink git commit: [FLINK-8125] [core] Introduce limiting of outgoing file system connections

Posted by se...@apache.org.
[FLINK-8125] [core] Introduce limiting of outgoing file system connections


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a11e2cf0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a11e2cf0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a11e2cf0

Branch: refs/heads/release-1.4
Commit: a11e2cf0b1f37d3ef22e1978e89928fa374960db
Parents: b5e156f
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 8 23:57:04 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 24 10:47:33 2017 +0100

----------------------------------------------------------------------
 docs/ops/filesystems.md                         |  126 ++
 .../flink/configuration/ConfigConstants.java    |    5 +-
 .../apache/flink/configuration/CoreOptions.java |   57 +
 .../core/fs/ConnectionLimitingFactory.java      |   95 ++
 .../org/apache/flink/core/fs/FileSystem.java    |   46 +-
 .../core/fs/LimitedConnectionsFileSystem.java   | 1114 ++++++++++++++++++
 .../core/fs/local/LocalFileSystemFactory.java   |    1 -
 .../util/function/SupplierWithException.java    |   38 +
 .../FilesystemSchemeConfigTest.java             |    6 +-
 .../fs/LimitedConnectionsConfigurationTest.java |   84 ++
 ...itedConnectionsFileSystemDelegationTest.java |  241 ++++
 .../fs/LimitedConnectionsFileSystemTest.java    |  742 ++++++++++++
 .../flink/runtime/fs/hdfs/HadoopFsFactory.java  |   35 +-
 .../runtime/fs/hdfs/HadoopFsFactoryTest.java    |    3 +-
 .../LimitedConnectionsConfigurationTest.java    |   84 ++
 15 files changed, 2654 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/docs/ops/filesystems.md
----------------------------------------------------------------------
diff --git a/docs/ops/filesystems.md b/docs/ops/filesystems.md
new file mode 100644
index 0000000..5b2a1e7
--- /dev/null
+++ b/docs/ops/filesystems.md
@@ -0,0 +1,126 @@
+---
+title: "File Systems"
+nav-parent_id: ops
+nav-pos: 12
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This page provides details on setting up and configuring distributed file systems for use with Flink.
+
+## Flink' File System support
+
+Flink uses file systems both as a source and sink in streaming/batch applications, and as a target for checkpointing.
+These file systens can for example be *Unix/Windows file systems*, *HDFS*, or even object stores like *S3*.
+
+The file system used for a specific file is determined by the file URI's scheme. For example `file:///home/user/text.txt` refers to
+a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` refers to a file in a specific HDFS cluster.
+
+File systems are represented via the `org.apache.flink.core.fs.FileSystem` class, which captures the ways to access and modify
+files and objects in that file system. FileSystem instances are instantiates once per process and then cached / pooled, to
+avoid configuration overhead per stream creation, and to enforce certain constraints, like connection/stream limits.
+
+### Built-in File Systems
+
+Flink directly implements the following file systems:
+
+  - **local**: This file system is used when the scheme is *"file://"*, and it represents the file system of the local machine, 
+including any NFS or SAN that is mounted into that local file system.
+
+  - **S3**: Flink directly provides file systems to talk to Amazon S3, registered under the scheme *"s3://"*.
+There are two alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`, based on code from the [Presto project](https://prestodb.io/)
+and the [Hadoop Project](https://hadoop.apache.org/). Both implementations are self-contained with no dependency footprint.
+To use those when using Flink as a library, add the resective maven dependency (`org.apache.flink:flink-s3-fs-presto:{{ site.version }}` or `org.apache.flink:flink-s3-fs-hadoop:{{ site.version }}`).
+When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
+See [AWS setup](deployment/aws.html) for details.
+
+  - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.
+
+### HDFS and Hadoop File System support 
+
+For a scheme where Flink does not implemented a file system itself, Flink will try to use Hadoop to instantiate a file system for the respective scheme.
+All Hadoop file systems are automatically available once `flink-runtime` and the relevant Hadoop libraries are in classpath.
+
+That way, Flink seamslessly supports all of Hadoop file systems, and all Hadoop-compatible file systems (HCFS), for example:
+
+  - **hdfs**
+  - **ftp**
+  - **s3n** and **s3a**
+  - **har**
+  - ...
+
+
+## Common File System configurations
+
+The following configuration settings exist across different file systems
+
+#### Default File System
+
+If paths to files do not explicitly specify a file system scheme (and authority), a default scheme (and authority) will be used.
+
+~~~
+fs.default-scheme: <default-fs>
+~~~
+
+For example, if the default file system configured as `fs.default-scheme: hdfs://localhost:9000/`, then a a file path of
+`/user/hugo/in.txt'` is interpreted as `hdfs://localhost:9000/user/hugo/in.txt'`
+
+#### Connection limiting
+
+You can limit the total number of connections that a file system can concurrently open. This is useful when the file system cannot handle a large number
+of concurrent reads / writes or open connections at the same time.
+
+For example, very small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint.
+
+To limit a specific file system's connections, add the following entries to the Flink configuration. The file system to be limited is identified by
+its scheme.
+
+~~~
+fs.<scheme>.limit.total: (number, 0/-1 mean no limit)
+fs.<scheme>.limit.input: (number, 0/-1 mean no limit)
+fs.<scheme>.limit.output: (number, 0/-1 mean no limit)
+fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
+fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
+~~~
+
+You can limit the number if input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on
+the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation will block until some streams are closed.
+If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening will fail.
+
+To prevent inactive streams from taking up the complete pool (preventing new connections to be opened), you can add an inactivity timeout for streams:
+`fs.<scheme>.limit.stream-timeout`. If a stream does not read/write any bytes for at least that amout of time, it is forcibly closed.
+
+These limits are enforced per TaskManager, so each TaskManager in a Flink application or cluster will open up to that number of connections.
+In addition, the The limit are also enforced only per FileSystem instance. Because File Systems are created per scheme and authority, different
+authorities will have their own connection pool. For example `hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools.
+
+
+## Adding new File System Implementations
+
+File system implementations are discovered by Flink through Java's service abstraction, making it easy to add additional file system implementations.
+
+In order to add a new File System, the following steps are needed:
+
+  - Add the File System implementation, which is a subclass of `org.apache.flink.core.fs.FileSystem`.
+  - Add a factory that instantiates that file system and declares the scheme under which the FileSystem is registered. This must be a subclass of `org.apache.flink.core.fs.FileSystemFactory`.
+  - Add a service entry. Create a file `META-INF/services/org.apache.flink.core.fs.FileSystemFactory` which contains the class name of your file system factory class.
+
+See the [Java Service Loader docs](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html) for more details on how service loaders work.
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index fcf73b8..f80bd9b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -614,7 +614,10 @@ public final class ConfigConstants {
 	 * Key to specify the default filesystem to be used by a job. In the case of
 	 * <code>file:///</code>, which is the default (see {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}),
 	 * the local filesystem is going to be used to resolve URIs without an explicit scheme.
-	 * */
+	 *
+	 * @deprecated Use {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME} instead.
+	 */
+	@Deprecated
 	public static final String FILESYSTEM_SCHEME = "fs.default-scheme";
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index e8ab8e4..928e810 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -20,6 +20,9 @@ package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+/**
+ * The set of configuration options for core parameters.
+ */
 @PublicEvolving
 public class CoreOptions {
 
@@ -83,4 +86,58 @@ public class CoreOptions {
 	public static final ConfigOption<String> CHECKPOINTS_DIRECTORY = ConfigOptions
 		.key("state.checkpoints.dir")
 		.noDefaultValue();
+
+	// ------------------------------------------------------------------------
+	//  file systems
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The default filesystem scheme, used for paths that do not declare a scheme explicitly.
+	 */
+	public static final ConfigOption<String> DEFAULT_FILESYSTEM_SCHEME = ConfigOptions
+			.key("fs.default-scheme")
+			.noDefaultValue();
+
+	/**
+	 * The total number of input plus output connections that a file system for the given scheme may open.
+	 * Unlimited be default.
+	 */
+	public static ConfigOption<Integer> fileSystemConnectionLimit(String scheme) {
+		return ConfigOptions.key("fs." + scheme + ".limit.total").defaultValue(-1);
+	}
+
+	/**
+	 * The total number of input connections that a file system for the given scheme may open.
+	 * Unlimited be default.
+	 */
+	public static ConfigOption<Integer> fileSystemConnectionLimitIn(String scheme) {
+		return ConfigOptions.key("fs." + scheme + ".limit.input").defaultValue(-1);
+	}
+
+	/**
+	 * The total number of output connections that a file system for the given scheme may open.
+	 * Unlimited be default.
+	 */
+	public static ConfigOption<Integer> fileSystemConnectionLimitOut(String scheme) {
+		return ConfigOptions.key("fs." + scheme + ".limit.output").defaultValue(-1);
+	}
+
+	/**
+	 * If any connection limit is configured, this option can be optionally set to define after
+	 * which time (in milliseconds) stream opening fails with a timeout exception, if no stream
+	 * connection becomes available. Unlimited timeout be default.
+	 */
+	public static ConfigOption<Long> fileSystemConnectionLimitTimeout(String scheme) {
+		return ConfigOptions.key("fs." + scheme + ".limit.timeout").defaultValue(0L);
+	}
+
+	/**
+	 * If any connection limit is configured, this option can be optionally set to define after
+	 * which time (in milliseconds) inactive streams are reclaimed. This option can help to prevent
+	 * that inactive streams make up the full pool of limited connections, and no further connections
+	 * can be established. Unlimited timeout be default.
+	 */
+	public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeout(String scheme) {
+		return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java
new file mode 100644
index 0000000..b85a7d6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ConnectionLimitingFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.LimitedConnectionsFileSystem.ConnectionLimitingSettings;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A wrapping factory that adds a {@link LimitedConnectionsFileSystem} to a file system.
+ */
+@Internal
+public class ConnectionLimitingFactory implements FileSystemFactory {
+
+	private final FileSystemFactory factory;
+
+	private final ConnectionLimitingSettings settings;
+
+	private ConnectionLimitingFactory(
+			FileSystemFactory factory,
+			ConnectionLimitingSettings settings) {
+
+		this.factory = checkNotNull(factory);
+		this.settings = checkNotNull(settings);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String getScheme() {
+		return factory.getScheme();
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		factory.configure(config);
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		FileSystem original = factory.create(fsUri);
+		return new LimitedConnectionsFileSystem(original,
+				settings.limitTotal, settings.limitOutput, settings.limitInput,
+				settings.streamOpenTimeout, settings.streamInactivityTimeout);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Decorates the given factory for a {@code ConnectionLimitingFactory}, if the given
+	 * configuration configured connection limiting for the given file system scheme.
+	 * Otherwise, it returns the given factory as is.
+	 *
+	 * @param factory The factory to potentially decorate.
+	 * @param scheme The file scheme for which to check the configuration.
+	 * @param config The configuration
+	 *
+	 * @return The decorated factors, if connection limiting is configured, the original factory otherwise.
+	 */
+	public static FileSystemFactory decorateIfLimited(FileSystemFactory factory, String scheme, Configuration config) {
+		checkNotNull(factory, "factory");
+
+		final ConnectionLimitingSettings settings = ConnectionLimitingSettings.fromConfig(config, scheme);
+
+		// decorate only if any limit is configured
+		if (settings == null) {
+			// no limit configured
+			return factory;
+		}
+		else {
+			return new ConnectionLimitingFactory(factory, settings);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 7a8245a..18baaa5 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -26,8 +26,8 @@ package org.apache.flink.core.fs;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.fs.local.LocalFileSystemFactory;
@@ -43,8 +43,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.ServiceLoader;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -214,8 +217,12 @@ public abstract class FileSystem {
 	/** Cache for file systems, by scheme + authority. */
 	private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();
 
-	/** Mapping of file system schemes to  the corresponding implementation factories. */
-	private static final HashMap<String, FileSystemFactory> FS_FACTORIES = loadFileSystems();
+	/** All available file system factories. */
+	private static final List<FileSystemFactory> RAW_FACTORIES = loadFileSystems();
+
+	/** Mapping of file system schemes to the corresponding factories,
+	 * populated in {@link FileSystem#initialize(Configuration)}. */
+	private static final HashMap<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();
 
 	/** The default factory that is used when no scheme matches. */
 	private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory();
@@ -236,7 +243,7 @@ public abstract class FileSystem {
 	 * of this method, this method clears the file system instance cache.
 	 *
 	 * <p>This method also reads the default file system URI from the configuration key
-	 * {@link ConfigConstants#FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where
+	 * {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where
 	 * the URI has no scheme will be interpreted as relative to that URI.
 	 * As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}.
 	 * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
@@ -249,17 +256,22 @@ public abstract class FileSystem {
 		try {
 			// make sure file systems are re-instantiated after re-configuration
 			CACHE.clear();
+			FS_FACTORIES.clear();
 
 			// configure all file system factories
-			for (FileSystemFactory factory : FS_FACTORIES.values()) {
+			for (FileSystemFactory factory : RAW_FACTORIES) {
 				factory.configure(config);
+				String scheme = factory.getScheme();
+
+				FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config);
+				FS_FACTORIES.put(scheme, fsf);
 			}
 
 			// configure the default (fallback) factory
 			FALLBACK_FACTORY.configure(config);
 
 			// also read the default file system scheme
-			final String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME, null);
+			final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null);
 			if (stringifiedUri == null) {
 				DEFAULT_SCHEME = null;
 			}
@@ -269,7 +281,7 @@ public abstract class FileSystem {
 				}
 				catch (URISyntaxException e) {
 					throw new IllegalConfigurationException("The default file system scheme ('" +
-							ConfigConstants.FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e);
+							CoreOptions.DEFAULT_FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e);
 				}
 			}
 		}
@@ -368,6 +380,13 @@ public abstract class FileSystem {
 				}
 			}
 
+			// this "default" initialization makes sure that the FileSystem class works
+			// even when not configured with an explicit Flink configuration, like on
+			// JobManager or TaskManager setup
+			if (FS_FACTORIES.isEmpty()) {
+				initialize(new Configuration());
+			}
+
 			// Try to create a new file system
 			final FileSystem fs;
 			final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme());
@@ -907,11 +926,11 @@ public abstract class FileSystem {
 	 *
 	 * @return A map from the file system scheme to corresponding file system factory.
 	 */
-	private static HashMap<String, FileSystemFactory> loadFileSystems() {
-		final HashMap<String, FileSystemFactory> map = new HashMap<>();
+	private static List<FileSystemFactory> loadFileSystems() {
+		final ArrayList<FileSystemFactory> list = new ArrayList<>();
 
 		// by default, we always have the local file system factory
-		map.put("file", new LocalFileSystemFactory());
+		list.add(new LocalFileSystemFactory());
 
 		LOG.debug("Loading extension file systems via services");
 
@@ -926,9 +945,8 @@ public abstract class FileSystem {
 			while (iter.hasNext()) {
 				try {
 					FileSystemFactory factory = iter.next();
-					String scheme = factory.getScheme();
-					map.put(scheme, factory);
-					LOG.debug("Added file system {}:{}", scheme, factory.getClass().getName());
+					list.add(factory);
+					LOG.debug("Added file system {}:{}", factory.getScheme(), factory.getClass().getName());
 				}
 				catch (Throwable t) {
 					// catching Throwable here to handle various forms of class loading
@@ -945,7 +963,7 @@ public abstract class FileSystem {
 			LOG.error("Failed to load additional file systems via services", t);
 		}
 
-		return map;
+		return Collections.unmodifiableList(list);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
new file mode 100644
index 0000000..5353563
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
@@ -0,0 +1,1114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * <p>This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and connections
+ * would fail in that case. This happens, for example, for very small HDFS clusters
+ * with few RPC handlers, when a large Flink job tries to build up many connections during
+ * a checkpoint.
+ *
+ * <p>The filesystem may track the progress of streams and close streams that have been
+ * inactive for too long, to avoid locked streams of taking up the complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to open a new stream
+ * periodically check the currently open streams once the limit of open streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+	private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+	/** The original file system to which connections are limited. */
+	private final FileSystem originalFs;
+
+	/** The lock that synchronizes connection bookkeeping. */
+	private final ReentrantLock lock;
+
+	/** Condition for threads that are blocking on the availability of new connections. */
+	private final Condition available;
+
+	/** The maximum number of concurrently open output streams. */
+	private final int maxNumOpenOutputStreams;
+
+	/** The maximum number of concurrently open input streams. */
+	private final int maxNumOpenInputStreams;
+
+	/** The maximum number of concurrently open streams (input + output). */
+	private final int maxNumOpenStreamsTotal;
+
+	/** The nanoseconds that a opening a stream may wait for availability. */
+	private final long streamOpenTimeoutNanos;
+
+	/** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */
+	private final long streamInactivityTimeoutNanos;
+
+	/** The set of currently open output streams. */
+	@GuardedBy("lock")
+	private final HashSet<OutStream> openOutputStreams;
+
+	/** The set of currently open input streams. */
+	@GuardedBy("lock")
+	private final HashSet<InStream> openInputStreams;
+
+	/** The number of output streams reserved to be opened. */
+	@GuardedBy("lock")
+	private int numReservedOutputStreams;
+
+	/** The number of input streams reserved to be opened. */
+	@GuardedBy("lock")
+	private int numReservedInputStreams;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new output connection limiting file system.
+	 *
+	 * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
+	 * then they are terminated as "inactive", to prevent that the limited number of connections gets
+	 * stuck on only blocked threads.
+	 *
+	 * @param originalFs              The original file system to which connections are limited.
+	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means no limit).
+	 */
+	public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal) {
+		this(originalFs, maxNumOpenStreamsTotal, 0, 0);
+	}
+
+	/**
+	 * Creates a new output connection limiting file system.
+	 *
+	 * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
+	 * then they are terminated as "inactive", to prevent that the limited number of connections gets
+	 * stuck on only blocked threads.
+	 *
+	 * @param originalFs              The original file system to which connections are limited.
+	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means no limit).
+	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system will wait when
+	 *                                no more connections are currently permitted.
+	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
+	 *                                bytes before it is closed as inactive.
+	 */
+	public LimitedConnectionsFileSystem(
+			FileSystem originalFs,
+			int maxNumOpenStreamsTotal,
+			long streamOpenTimeout,
+			long streamInactivityTimeout) {
+		this(originalFs, maxNumOpenStreamsTotal, 0, 0, streamOpenTimeout, streamInactivityTimeout);
+	}
+
+	/**
+	 * Creates a new output connection limiting file system, limiting input and output streams with
+	 * potentially different quotas.
+	 *
+	 * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
+	 * then they are terminated as "inactive", to prevent that the limited number of connections gets
+	 * stuck on only blocked threads.
+	 *
+	 * @param originalFs              The original file system to which connections are limited.
+	 * @param maxNumOpenStreamsTotal  The maximum number of concurrent open streams (0 means no limit).
+	 * @param maxNumOpenOutputStreams The maximum number of concurrent open output streams (0 means no limit).
+	 * @param maxNumOpenInputStreams  The maximum number of concurrent open input streams (0 means no limit).
+	 * @param streamOpenTimeout       The maximum number of milliseconds that the file system will wait when
+	 *                                no more connections are currently permitted.
+	 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
+	 *                                bytes before it is closed as inactive.
+	 */
+	public LimitedConnectionsFileSystem(
+			FileSystem originalFs,
+			int maxNumOpenStreamsTotal,
+			int maxNumOpenOutputStreams,
+			int maxNumOpenInputStreams,
+			long streamOpenTimeout,
+			long streamInactivityTimeout) {
+
+		checkArgument(maxNumOpenStreamsTotal >= 0, "maxNumOpenStreamsTotal must be >= 0");
+		checkArgument(maxNumOpenOutputStreams >= 0, "maxNumOpenOutputStreams must be >= 0");
+		checkArgument(maxNumOpenInputStreams >= 0, "maxNumOpenInputStreams must be >= 0");
+		checkArgument(streamOpenTimeout >= 0, "stream opening timeout must be >= 0 (0 means infinite timeout)");
+		checkArgument(streamInactivityTimeout >= 0, "stream inactivity timeout must be >= 0 (0 means infinite timeout)");
+
+		this.originalFs = checkNotNull(originalFs, "originalFs");
+		this.lock = new ReentrantLock(true);
+		this.available = lock.newCondition();
+		this.openOutputStreams = new HashSet<>();
+		this.openInputStreams = new HashSet<>();
+		this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
+		this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
+		this.maxNumOpenInputStreams = maxNumOpenInputStreams;
+
+		// assign nanos overflow aware
+		final long openTimeoutNanos = streamOpenTimeout * 1_000_000;
+		final long inactivityTimeoutNanos = streamInactivityTimeout * 1_000_000;
+
+		this.streamOpenTimeoutNanos =
+				openTimeoutNanos >= streamOpenTimeout ? openTimeoutNanos : Long.MAX_VALUE;
+
+		this.streamInactivityTimeoutNanos =
+				inactivityTimeoutNanos >= streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the maximum number of concurrently open output streams.
+	 */
+	public int getMaxNumOpenOutputStreams() {
+		return maxNumOpenOutputStreams;
+	}
+
+	/**
+	 * Gets the maximum number of concurrently open input streams.
+	 */
+	public int getMaxNumOpenInputStreams() {
+		return maxNumOpenInputStreams;
+	}
+
+	/**
+	 * Gets the maximum number of concurrently open streams (input + output).
+	 */
+	public int getMaxNumOpenStreamsTotal() {
+		return maxNumOpenStreamsTotal;
+	}
+
+	/**
+	 * Gets the number of milliseconds that a opening a stream may wait for availability in the
+	 * connection pool.
+	 */
+	public long getStreamOpenTimeout() {
+		return streamOpenTimeoutNanos / 1_000_000;
+	}
+
+	/**
+	 * Gets the milliseconds that a stream may spend not writing any bytes before it is closed as inactive.
+	 */
+	public long getStreamInactivityTimeout() {
+		return streamInactivityTimeoutNanos / 1_000_000;
+	}
+
+	/**
+	 * Gets the total number of open streams (input plus output).
+	 */
+	public int getTotalNumberOfOpenStreams() {
+		lock.lock();
+		try {
+			return numReservedOutputStreams + numReservedInputStreams;
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Gets the number of currently open output streams.
+	 */
+	public int getNumberOfOpenOutputStreams() {
+		lock.lock();
+		try {
+			return numReservedOutputStreams;
+		}
+		finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Gets the number of currently open input streams.
+	 */
+	public int getNumberOfOpenInputStreams() {
+		return numReservedInputStreams;
+	}
+
+	// ------------------------------------------------------------------------
+	//  input & output stream opening methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException {
+		return createOutputStream(() -> originalFs.create(f, overwriteMode));
+	}
+
+	@Override
+	@Deprecated
+	@SuppressWarnings("deprecation")
+	public FSDataOutputStream create(
+			Path f,
+			boolean overwrite,
+			int bufferSize,
+			short replication,
+			long blockSize) throws IOException {
+
+		return createOutputStream(() -> originalFs.create(f, overwrite, bufferSize, replication, blockSize));
+	}
+
+	@Override
+	public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+		return createInputStream(() -> originalFs.open(f, bufferSize));
+	}
+
+	@Override
+	public FSDataInputStream open(Path f) throws IOException {
+		return createInputStream(() -> originalFs.open(f));
+	}
+
+	private FSDataOutputStream createOutputStream(
+			final SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws IOException {
+
+		final SupplierWithException<OutStream, IOException> wrappedStreamOpener =
+				() -> new OutStream(streamOpener.get(), this);
+
+		return createStream(wrappedStreamOpener, openOutputStreams, true);
+	}
+
+	private FSDataInputStream createInputStream(
+			final SupplierWithException<FSDataInputStream, IOException> streamOpener) throws IOException {
+
+		final SupplierWithException<InStream, IOException> wrappedStreamOpener =
+				() -> new InStream(streamOpener.get(), this);
+
+		return createStream(wrappedStreamOpener, openInputStreams, false);
+	}
+
+	// ------------------------------------------------------------------------
+	//  other delegating file system methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public FileSystemKind getKind() {
+		return originalFs.getKind();
+	}
+
+	@Override
+	public boolean isDistributedFS() {
+		return originalFs.isDistributedFS();
+	}
+
+	@Override
+	public Path getWorkingDirectory() {
+		return originalFs.getWorkingDirectory();
+	}
+
+	@Override
+	public Path getHomeDirectory() {
+		return originalFs.getHomeDirectory();
+	}
+
+	@Override
+	public URI getUri() {
+		return originalFs.getUri();
+	}
+
+	@Override
+	public FileStatus getFileStatus(Path f) throws IOException {
+		return originalFs.getFileStatus(f);
+	}
+
+	@Override
+	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+		return originalFs.getFileBlockLocations(file, start, len);
+	}
+
+	@Override
+	public FileStatus[] listStatus(Path f) throws IOException {
+		return originalFs.listStatus(f);
+	}
+
+	@Override
+	public boolean delete(Path f, boolean recursive) throws IOException {
+		return originalFs.delete(f, recursive);
+	}
+
+	@Override
+	public boolean mkdirs(Path f) throws IOException {
+		return originalFs.mkdirs(f);
+	}
+
+	@Override
+	public boolean rename(Path src, Path dst) throws IOException {
+		return originalFs.rename(src, dst);
+	}
+
+	@Override
+	public boolean exists(Path f) throws IOException {
+		return originalFs.exists(f);
+	}
+
+	@Override
+	@Deprecated
+	@SuppressWarnings("deprecation")
+	public long getDefaultBlockSize() {
+		return originalFs.getDefaultBlockSize();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private <T extends StreamWithTimeout> T createStream(
+			final SupplierWithException<T, IOException> streamOpener,
+			final HashSet<T> openStreams,
+			final boolean output) throws IOException {
+
+		final int outputLimit = output && maxNumOpenInputStreams > 0 ? maxNumOpenOutputStreams : Integer.MAX_VALUE;
+		final int inputLimit = !output && maxNumOpenInputStreams > 0 ? maxNumOpenInputStreams : Integer.MAX_VALUE;
+		final int totalLimit = maxNumOpenStreamsTotal > 0 ? maxNumOpenStreamsTotal : Integer.MAX_VALUE;
+		final int outputCredit = output ? 1 : 0;
+		final int inputCredit = output ? 0 : 1;
+
+		// because waiting for availability may take long, we need to be interruptible here
+		// and handle interrupted exceptions as I/O errors
+		// even though the code is written to make sure the lock is held for a short time only,
+		// making the lock acquisition interruptible helps to guard against the cases where
+		// a supposedly fast operation (like 'getPos()' on a stream) actually takes long.
+		try {
+			lock.lockInterruptibly();
+			try {
+				// some integrity checks
+				assert openOutputStreams.size() <= numReservedOutputStreams;
+				assert openInputStreams.size() <= numReservedInputStreams;
+
+				// wait until there are few enough streams so we can open another
+				waitForAvailability(totalLimit, outputLimit, inputLimit);
+
+				// We do not open the stream here in the locked scope because opening a stream
+				// could take a while. Holding the lock during that operation would block all concurrent
+				// attempts to try and open a stream, effectively serializing all calls to open the streams.
+				numReservedOutputStreams += outputCredit;
+				numReservedInputStreams += inputCredit;
+			}
+			finally {
+				lock.unlock();
+			}
+		}
+		catch (InterruptedException e) {
+			// restore interruption flag
+			Thread.currentThread().interrupt();
+			throw new IOException("interrupted before opening stream");
+		}
+
+		// open the stream outside the lock.
+		boolean success = false;
+		try {
+			final T out = streamOpener.get();
+
+			// add the stream to the set, need to re-acquire the lock
+			lock.lock();
+			try {
+				openStreams.add(out);
+			} finally {
+				lock.unlock();
+			}
+
+			// good, can now return cleanly
+			success = true;
+			return out;
+		}
+		finally {
+			if (!success) {
+				// remove the reserved credit
+				// we must open this non-interruptibly, because this must succeed!
+				lock.lock();
+				try {
+					numReservedOutputStreams -= outputCredit;
+					numReservedInputStreams -= inputCredit;
+					available.signalAll();
+				} finally {
+					lock.unlock();
+				}
+			}
+		}
+	}
+
+	@GuardedBy("lock")
+	private void waitForAvailability(
+			int totalLimit,
+			int outputLimit,
+			int inputLimit) throws InterruptedException, IOException {
+
+		checkState(lock.isHeldByCurrentThread());
+
+		// compute the deadline of this operations
+		final long deadline;
+		if (streamOpenTimeoutNanos == 0) {
+			deadline = Long.MAX_VALUE;
+		} else {
+			long deadlineNanos = System.nanoTime() + streamOpenTimeoutNanos;
+			// check for overflow
+			deadline = deadlineNanos > 0 ? deadlineNanos : Long.MAX_VALUE;
+		}
+
+		// wait for available connections
+		long timeLeft;
+
+		if (streamInactivityTimeoutNanos == 0) {
+			// simple case: just wait
+			while ((timeLeft = (deadline - System.nanoTime())) > 0 &&
+					!hasAvailability(totalLimit, outputLimit, inputLimit)) {
+
+				available.await(timeLeft, TimeUnit.NANOSECONDS);
+			}
+		}
+		else {
+			// complex case: chase down inactive streams
+			final long checkIntervalNanos = (streamInactivityTimeoutNanos >>> 1) + 1;
+
+			long now;
+			while ((timeLeft = (deadline - (now = System.nanoTime()))) > 0 && // while still within timeout
+					!hasAvailability(totalLimit, outputLimit, inputLimit)) {
+
+				// check all streams whether there in one that has been inactive for too long
+				if (!(closeInactiveStream(openOutputStreams, now) || closeInactiveStream(openInputStreams, now))) {
+					// only wait if we did not manage to close any stream.
+					// otherwise eagerly check again if we have availability now (we should have!)
+					long timeToWait = Math.min(checkIntervalNanos, timeLeft);
+					available.await(timeToWait, TimeUnit.NANOSECONDS);
+				}
+			}
+		}
+
+		// check for timeout
+		// we check availability again to catch cases where the timeout expired while waiting
+		// to re-acquire the lock
+		if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit, inputLimit)) {
+			throw new IOException(String.format(
+					"Timeout while waiting for an available stream/connection. " +
+					"limits: total=%d, input=%d, output=%d ; Open: input=%d, output=%d ; timeout: %d ms",
+					maxNumOpenStreamsTotal, maxNumOpenInputStreams, maxNumOpenOutputStreams,
+					numReservedInputStreams, numReservedOutputStreams, getStreamOpenTimeout()));
+		}
+	}
+
+	@GuardedBy("lock")
+	private boolean hasAvailability(int totalLimit, int outputLimit, int inputLimit) {
+		return numReservedOutputStreams < outputLimit &&
+				numReservedInputStreams < inputLimit &&
+				numReservedOutputStreams + numReservedInputStreams < totalLimit;
+	}
+
+	@GuardedBy("lock")
+	private boolean closeInactiveStream(HashSet<? extends StreamWithTimeout> streams, long nowNanos) {
+		for (StreamWithTimeout stream : streams) {
+			try {
+				final StreamProgressTracker tracker = stream.getProgressTracker();
+
+				// If the stream is closed already, it will be removed anyways, so we
+				// do not classify it as inactive. We also skip the check if another check happened too recently.
+				if (stream.isClosed() || nowNanos < tracker.getLastCheckTimestampNanos() + streamInactivityTimeoutNanos) {
+					// interval since last check not yet over
+					return false;
+				}
+				else if (!tracker.checkNewBytesAndMark(nowNanos)) {
+					stream.closeDueToTimeout();
+					return true;
+				}
+			}
+			catch (StreamTimeoutException ignored) {
+				// may happen due to races
+			}
+			catch (IOException e) {
+				// only log on debug level here, to avoid log spamming
+				LOG.debug("Could not check for stream progress to determine inactivity", e);
+			}
+		}
+
+		return false;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Atomically removes the given output stream from the set of currently open output streams,
+	 * and signals that new stream can now be opened.
+	 */
+	void unregisterOutputStream(OutStream stream) {
+		lock.lock();
+		try {
+			// only decrement if we actually remove the stream
+			if (openOutputStreams.remove(stream)) {
+				numReservedOutputStreams--;
+				available.signalAll();
+			}
+		}
+		finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Atomically removes the given input stream from the set of currently open input streams,
+	 * and signals that new stream can now be opened.
+	 */
+	void unregisterInputStream(InStream stream) {
+		lock.lock();
+		try {
+			// only decrement if we actually remove the stream
+			if (openInputStreams.remove(stream)) {
+				numReservedInputStreams--;
+				available.signalAll();
+			}
+		}
+		finally {
+			lock.unlock();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A special IOException, indicating a timeout in the data output stream.
+	 */
+	public static final class StreamTimeoutException extends IOException {
+
+		private static final long serialVersionUID = -8790922066795901928L;
+
+		public StreamTimeoutException() {
+			super("Stream closed due to inactivity timeout. " +
+					"This is done to prevent inactive streams from blocking the full " +
+					"pool of limited connections");
+		}
+
+		public StreamTimeoutException(StreamTimeoutException other) {
+			super(other.getMessage(), other);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Interface for streams that can be checked for inactivity.
+	 */
+	private interface StreamWithTimeout extends Closeable {
+
+		/**
+		 * Gets the progress tracker for this stream.
+		 */
+		StreamProgressTracker getProgressTracker();
+
+		/**
+		 * Gets the current position in the stream, as in number of bytes read or written.
+		 */
+		long getPos() throws IOException;
+
+		/**
+		 * Closes the stream asynchronously with a special exception that indicates closing due
+		 * to lack of progress.
+		 */
+		void closeDueToTimeout() throws IOException;
+
+		/**
+		 * Checks whether the stream was closed already.
+		 */
+		boolean isClosed();
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A tracker for stream progress. This records the number of bytes read / written together
+	 * with a timestamp when the last check happened.
+	 */
+	private static final class StreamProgressTracker {
+
+		/** The tracked stream. */
+		private final StreamWithTimeout stream;
+
+		/** The number of bytes written the last time that the {@link #checkNewBytesAndMark(long)}
+		 * method was called. It is important to initialize this with {@code -1} so that the
+		 * first check (0 bytes) always appears to have made progress. */
+		private volatile long lastCheckBytes = -1;
+
+		/** The timestamp when the last inactivity evaluation was made. */
+		private volatile long lastCheckTimestampNanos;
+
+		StreamProgressTracker(StreamWithTimeout stream) {
+			this.stream = stream;
+		}
+
+		/**
+		 * Gets the timestamp when the last inactivity evaluation was made.
+		 */
+		public long getLastCheckTimestampNanos() {
+			return lastCheckTimestampNanos;
+		}
+
+		/**
+		 * Checks whether there were new bytes since the last time this method was invoked.
+		 * This also sets the given timestamp, to be read via {@link #getLastCheckTimestampNanos()}.
+		 *
+		 * @return True, if there were new bytes, false if not.
+		 */
+		public boolean checkNewBytesAndMark(long timestamp) throws IOException {
+			// remember the time when checked
+			lastCheckTimestampNanos = timestamp;
+
+			final long bytesNow = stream.getPos();
+			if (bytesNow > lastCheckBytes) {
+				lastCheckBytes = bytesNow;
+				return true;
+			}
+			else {
+				return false;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A data output stream that wraps a given data output stream and un-registers
+	 * from a given connection-limiting file system
+	 * (via {@link LimitedConnectionsFileSystem#unregisterOutputStream(OutStream)}
+	 * upon closing.
+	 */
+	private static final class OutStream extends FSDataOutputStream implements StreamWithTimeout {
+
+		/** The original data output stream to write to. */
+		private final FSDataOutputStream originalStream;
+
+		/** The connection-limiting file system to un-register from. */
+		private final LimitedConnectionsFileSystem fs;
+
+		/** The progress tracker for this stream. */
+		private final StreamProgressTracker progressTracker;
+
+		/** An exception with which the stream has been externally closed. */
+		private volatile StreamTimeoutException timeoutException;
+
+		/** Flag tracking whether the stream was already closed, for proper inactivity tracking. */
+		private AtomicBoolean closed = new AtomicBoolean();
+
+		OutStream(FSDataOutputStream originalStream, LimitedConnectionsFileSystem fs) {
+			this.originalStream = checkNotNull(originalStream);
+			this.fs = checkNotNull(fs);
+			this.progressTracker = new StreamProgressTracker(this);
+		}
+
+		// --- FSDataOutputStream API implementation
+
+		@Override
+		public void write(int b) throws IOException {
+			try {
+				originalStream.write(b);
+			}
+			catch (IOException e) {
+				handleIOException(e);
+			}
+		}
+
+		@Override
+		public void write(byte[] b, int off, int len) throws IOException {
+			try {
+				originalStream.write(b, off, len);
+			}
+			catch (IOException e) {
+				handleIOException(e);
+			}
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			try {
+				return originalStream.getPos();
+			}
+			catch (IOException e) {
+				handleIOException(e);
+				return -1; // silence the compiler
+			}
+		}
+
+		@Override
+		public void flush() throws IOException {
+			try {
+				originalStream.flush();
+			}
+			catch (IOException e) {
+				handleIOException(e);
+			}
+		}
+
+		@Override
+		public void sync() throws IOException {
+			try {
+				originalStream.sync();
+			}
+			catch (IOException e) {
+				handleIOException(e);
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			if (closed.compareAndSet(false, true)) {
+				try {
+					originalStream.close();
+				}
+				catch (IOException e) {
+					handleIOException(e);
+				}
+				finally {
+					fs.unregisterOutputStream(this);
+				}
+			}
+		}
+
+		@Override
+		public void closeDueToTimeout() throws IOException {
+			this.timeoutException = new StreamTimeoutException();
+			close();
+		}
+
+		@Override
+		public boolean isClosed() {
+			return closed.get();
+		}
+
+		@Override
+		public StreamProgressTracker getProgressTracker() {
+			return progressTracker;
+		}
+
+		private void handleIOException(IOException exception) throws IOException {
+			if (timeoutException == null) {
+				throw exception;
+			} else {
+				// throw a new exception to capture this call's stack trace
+				// the new exception is forwarded as a suppressed exception
+				StreamTimeoutException te = new StreamTimeoutException(timeoutException);
+				te.addSuppressed(exception);
+				throw te;
+			}
+		}
+	}
+
+	/**
+	 * A data input stream that wraps a given data input stream and un-registers
+	 * from a given connection-limiting file system
+	 * (via {@link LimitedConnectionsFileSystem#unregisterInputStream(InStream)}
+	 * upon closing.
+	 */
+	private static final class InStream extends FSDataInputStream implements StreamWithTimeout {
+
+		/** The original data input stream to read from. */
+		private final FSDataInputStream originalStream;
+
+		/** The connection-limiting file system to un-register from. */
+		private final LimitedConnectionsFileSystem fs;
+
+		/** An exception with which the stream has been externally closed. */
+		private volatile StreamTimeoutException timeoutException;
+
+		/** The progress tracker for this stream. */
+		private final StreamProgressTracker progressTracker;
+
+		/** Flag tracking whether the stream was already closed, for proper inactivity tracking. */
+		private AtomicBoolean closed = new AtomicBoolean();
+
+		InStream(FSDataInputStream originalStream, LimitedConnectionsFileSystem fs) {
+			this.originalStream = checkNotNull(originalStream);
+			this.fs = checkNotNull(fs);
+			this.progressTracker = new StreamProgressTracker(this);
+		}
+
+		// --- FSDataOutputStream API implementation
+
+		@Override
+		public int read() throws IOException {
+			try {
+				return originalStream.read();
+			}
+			catch (IOException e) {
+				handleIOException(e);
+				return 0; // silence the compiler
+			}
+		}
+
+		@Override
+		public int read(byte[] b) throws IOException {
+			try {
+				return originalStream.read(b);
+			}
+			catch (IOException e) {
+				handleIOException(e);
+				return 0; // silence the compiler
+			}
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			try {
+				return originalStream.read(b, off, len);
+			}
+			catch (IOException e) {
+				handleIOException(e);
+				return 0; // silence the compiler
+			}
+		}
+
+		@Override
+		public long skip(long n) throws IOException {
+			try {
+				return originalStream.skip(n);
+			}
+			catch (IOException e) {
+				handleIOException(e);
+				return 0L; // silence the compiler
+			}
+		}
+
+		@Override
+		public int available() throws IOException {
+			try {
+				return originalStream.available();
+			}
+			catch (IOException e) {
+				handleIOException(e);
+				return 0; // silence the compiler
+			}
+		}
+
+		@Override
+		public void mark(int readlimit) {
+			originalStream.mark(readlimit);
+		}
+
+		@Override
+		public void reset() throws IOException {
+			try {
+				originalStream.reset();
+			}
+			catch (IOException e) {
+				handleIOException(e);
+			}
+		}
+
+		@Override
+		public boolean markSupported() {
+			return originalStream.markSupported();
+		}
+
+		@Override
+		public void seek(long desired) throws IOException {
+			try {
+				originalStream.seek(desired);
+			}
+			catch (IOException e) {
+				handleIOException(e);
+			}
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			try {
+				return originalStream.getPos();
+			}
+			catch (IOException e) {
+				handleIOException(e);
+				return 0; // silence the compiler
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			if (closed.compareAndSet(false, true)) {
+				try {
+					originalStream.close();
+				}
+				catch (IOException e) {
+					handleIOException(e);
+				}
+				finally {
+					fs.unregisterInputStream(this);
+				}
+			}
+		}
+
+		@Override
+		public void closeDueToTimeout() throws IOException {
+			this.timeoutException = new StreamTimeoutException();
+			close();
+		}
+
+		@Override
+		public boolean isClosed() {
+			return closed.get();
+		}
+
+		@Override
+		public StreamProgressTracker getProgressTracker() {
+			return progressTracker;
+		}
+
+		private void handleIOException(IOException exception) throws IOException {
+			if (timeoutException == null) {
+				throw exception;
+			} else {
+				// throw a new exception to capture this call's stack trace
+				// the new exception is forwarded as a suppressed exception
+				StreamTimeoutException te = new StreamTimeoutException(timeoutException);
+				te.addSuppressed(exception);
+				throw te;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A simple configuration data object capturing the settings for limited connections.
+	 */
+	public static class ConnectionLimitingSettings {
+
+		/** The limit for the total number of connections, or 0, if no limit. */
+		public final int limitTotal;
+
+		/** The limit for the number of input stream connections, or 0, if no limit. */
+		public final int limitInput;
+
+		/** The limit for the number of output stream connections, or 0, if no limit. */
+		public final int limitOutput;
+
+		/** The stream opening timeout for a stream, in milliseconds. */
+		public final long streamOpenTimeout;
+
+		/** The inactivity timeout for a stream, in milliseconds. */
+		public final long streamInactivityTimeout;
+
+		/**
+		 * Creates a new ConnectionLimitingSettings with the given parameters.
+		 *
+		 * @param limitTotal The limit for the total number of connections, or 0, if no limit.
+		 * @param limitInput The limit for the number of input stream connections, or 0, if no limit.
+		 * @param limitOutput The limit for the number of output stream connections, or 0, if no limit.
+		 * @param streamOpenTimeout       The maximum number of milliseconds that the file system will wait when
+		 *                                no more connections are currently permitted.
+		 * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
+		 *                                bytes before it is closed as inactive.
+		 */
+		public ConnectionLimitingSettings(
+				int limitTotal,
+				int limitInput,
+				int limitOutput,
+				long streamOpenTimeout,
+				long streamInactivityTimeout) {
+			checkArgument(limitTotal >= 0);
+			checkArgument(limitInput >= 0);
+			checkArgument(limitOutput >= 0);
+			checkArgument(streamOpenTimeout >= 0);
+			checkArgument(streamInactivityTimeout >= 0);
+
+			this.limitTotal = limitTotal;
+			this.limitInput = limitInput;
+			this.limitOutput = limitOutput;
+			this.streamOpenTimeout = streamOpenTimeout;
+			this.streamInactivityTimeout = streamInactivityTimeout;
+		}
+
+		// --------------------------------------------------------------------
+
+		/**
+		 * Parses and returns the settings for connection limiting, for the file system with
+		 * the given file system scheme.
+		 *
+		 * @param config The configuration to check.
+		 * @param fsScheme The file system scheme.
+		 *
+		 * @return The parsed configuration, or null, if no connection limiting is configured.
+		 */
+		@Nullable
+		public static ConnectionLimitingSettings fromConfig(Configuration config, String fsScheme) {
+			checkNotNull(fsScheme, "fsScheme");
+			checkNotNull(config, "config");
+
+			final ConfigOption<Integer> totalLimitOption = CoreOptions.fileSystemConnectionLimit(fsScheme);
+			final ConfigOption<Integer> limitInOption = CoreOptions.fileSystemConnectionLimitIn(fsScheme);
+			final ConfigOption<Integer> limitOutOption = CoreOptions.fileSystemConnectionLimitOut(fsScheme);
+
+			final int totalLimit = config.getInteger(totalLimitOption);
+			final int limitIn = config.getInteger(limitInOption);
+			final int limitOut = config.getInteger(limitOutOption);
+
+			checkLimit(totalLimit, totalLimitOption);
+			checkLimit(limitIn, limitInOption);
+			checkLimit(limitOut, limitOutOption);
+
+			// create the settings only, if at least one limit is configured
+			if (totalLimit <= 0 || limitIn <= 0 || limitOut <= 0) {
+				// no limit configured
+				return null;
+			}
+			else {
+				final ConfigOption<Long> openTimeoutOption =
+						CoreOptions.fileSystemConnectionLimitTimeout(fsScheme);
+				final ConfigOption<Long> inactivityTimeoutOption =
+						CoreOptions.fileSystemConnectionLimitStreamInactivityTimeout(fsScheme);
+
+				final long openTimeout = config.getLong(openTimeoutOption);
+				final long inactivityTimeout = config.getLong(inactivityTimeoutOption);
+
+				checkTimeout(openTimeout, openTimeoutOption);
+				checkTimeout(inactivityTimeout, inactivityTimeoutOption);
+
+				return new ConnectionLimitingSettings(
+						totalLimit == -1 ? 0 : totalLimit,
+						limitIn == -1 ? 0 : limitIn,
+						limitOut == -1 ? 0 : limitOut,
+						openTimeout,
+						inactivityTimeout);
+			}
+		}
+
+		private static void checkLimit(int value, ConfigOption<Integer> option) {
+			if (value < -1) {
+				throw new IllegalConfigurationException("Invalid value for '" + option.key() + "': " + value);
+			}
+		}
+
+		private static void checkTimeout(long timeout, ConfigOption<Long> option) {
+			if (timeout < 0) {
+				throw new IllegalConfigurationException("Invalid value for '" + option.key() + "': " + timeout);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
index 7cbc2bd..785391a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.core.fs.local.LocalFileSystem;
 
 import java.net.URI;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java
new file mode 100644
index 0000000..63be9bf
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+/**
+ * A functional interface for a {@link java.util.function.Supplier} that may
+ * throw exceptions.
+ *
+ * @param <R> The type of the result of the supplier.
+ * @param <E> The type of Exceptions thrown by this function.
+ */
+@FunctionalInterface
+public interface SupplierWithException<R, E extends Throwable> {
+
+	/**
+	 * Gets the result of this supplier.
+	 *
+	 * @return The result of thus supplier.
+	 * @throws E This function may throw an exception.
+	 */
+	R get() throws E;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
index 43c79c1..1cbc8f2 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
@@ -61,7 +61,7 @@ public class FilesystemSchemeConfigTest extends TestLogger {
 	@Test
 	public void testExplicitlySetToLocal() throws Exception {
 		final Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.FILESYSTEM_SCHEME, LocalFileSystem.getLocalFsURI().toString());
+		conf.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, LocalFileSystem.getLocalFsURI().toString());
 		FileSystem.initialize(conf);
 
 		URI justPath = new URI(tempFolder.newFile().toURI().getPath());
@@ -74,7 +74,7 @@ public class FilesystemSchemeConfigTest extends TestLogger {
 	@Test
 	public void testExplicitlySetToOther() throws Exception {
 		final Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.FILESYSTEM_SCHEME, "otherFS://localhost:1234/");
+		conf.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "otherFS://localhost:1234/");
 		FileSystem.initialize(conf);
 
 		URI justPath = new URI(tempFolder.newFile().toURI().getPath());
@@ -92,7 +92,7 @@ public class FilesystemSchemeConfigTest extends TestLogger {
 	@Test
 	public void testExplicitlyPathTakesPrecedence() throws Exception {
 		final Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.FILESYSTEM_SCHEME, "otherFS://localhost:1234/");
+		conf.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "otherFS://localhost:1234/");
 		FileSystem.initialize(conf);
 
 		URI pathAndScheme = tempFolder.newFile().toURI();

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
new file mode 100644
index 0000000..4742a7e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.testutils.TestFileSystem;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that validate that the configuration for limited FS
+ * connections are properly picked up.
+ */
+public class LimitedConnectionsConfigurationTest {
+
+	@Rule
+	public final TemporaryFolder tempDir = new TemporaryFolder();
+
+	@Test
+	public void testConfiguration() throws Exception {
+		final String fsScheme = TestFileSystem.SCHEME;
+
+		// nothing configured, we should get a regular file system
+		FileSystem schemeFs = FileSystem.get(URI.create(fsScheme + ":///a/b/c"));
+		FileSystem localFs = FileSystem.get(tempDir.newFile().toURI());
+
+		assertFalse(schemeFs instanceof LimitedConnectionsFileSystem);
+		assertFalse(localFs instanceof LimitedConnectionsFileSystem);
+
+		// configure some limits, which should cause "fsScheme" to be limited
+
+		final Configuration config = new Configuration();
+		config.setInteger("fs." + fsScheme + ".limit.total", 42);
+		config.setInteger("fs." + fsScheme + ".limit.input", 11);
+		config.setInteger("fs." + fsScheme + ".limit.output", 40);
+		config.setInteger("fs." + fsScheme + ".limit.timeout", 12345);
+		config.setInteger("fs." + fsScheme + ".limit.stream-timeout", 98765);
+
+		try {
+			FileSystem.initialize(config);
+
+			schemeFs = FileSystem.get(URI.create(fsScheme + ":///a/b/c"));
+			localFs = FileSystem.get(tempDir.newFile().toURI());
+
+			assertTrue(schemeFs instanceof LimitedConnectionsFileSystem);
+			assertFalse(localFs instanceof LimitedConnectionsFileSystem);
+
+			LimitedConnectionsFileSystem limitedFs = (LimitedConnectionsFileSystem) schemeFs;
+			assertEquals(42, limitedFs.getMaxNumOpenStreamsTotal());
+			assertEquals(11, limitedFs.getMaxNumOpenInputStreams());
+			assertEquals(40, limitedFs.getMaxNumOpenOutputStreams());
+			assertEquals(12345, limitedFs.getStreamOpenTimeout());
+			assertEquals(98765, limitedFs.getStreamInactivityTimeout());
+		}
+		finally {
+			// clear all settings
+			FileSystem.initialize(new Configuration());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a11e2cf0/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
new file mode 100644
index 0000000..b133677
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemDelegationTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyShort;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that the method delegation works properly the {@link LimitedConnectionsFileSystem}
+ * and its created input and output streams.
+ */
+public class LimitedConnectionsFileSystemDelegationTest {
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	@SuppressWarnings("deprecation")
+	public void testDelegateFsMethods() throws IOException {
+		final FileSystem fs = mock(FileSystem.class);
+		when(fs.open(any(Path.class))).thenReturn(mock(FSDataInputStream.class));
+		when(fs.open(any(Path.class), anyInt())).thenReturn(mock(FSDataInputStream.class));
+		when(fs.create(any(Path.class), anyBoolean())).thenReturn(mock(FSDataOutputStream.class));
+		when(fs.create(any(Path.class), any(WriteMode.class))).thenReturn(mock(FSDataOutputStream.class));
+		when(fs.create(any(Path.class), anyBoolean(), anyInt(), anyShort(), anyLong())).thenReturn(mock(FSDataOutputStream.class));
+
+		final LimitedConnectionsFileSystem lfs = new LimitedConnectionsFileSystem(fs, 1000);
+		final Random rnd = new Random();
+
+		lfs.isDistributedFS();
+		verify(fs).isDistributedFS();
+
+		lfs.getWorkingDirectory();
+		verify(fs).isDistributedFS();
+
+		lfs.getHomeDirectory();
+		verify(fs).getHomeDirectory();
+
+		lfs.getUri();
+		verify(fs).getUri();
+
+		{
+			Path path = mock(Path.class);
+			lfs.getFileStatus(path);
+			verify(fs).getFileStatus(path);
+		}
+
+		{
+			FileStatus path = mock(FileStatus.class);
+			int pos = rnd.nextInt();
+			int len = rnd.nextInt();
+			lfs.getFileBlockLocations(path, pos, len);
+			verify(fs).getFileBlockLocations(path, pos, len);
+		}
+
+		{
+			Path path = mock(Path.class);
+			int bufferSize = rnd.nextInt();
+			lfs.open(path, bufferSize);
+			verify(fs).open(path, bufferSize);
+		}
+
+		{
+			Path path = mock(Path.class);
+			lfs.open(path);
+			verify(fs).open(path);
+		}
+
+		lfs.getDefaultBlockSize();
+		verify(fs).getDefaultBlockSize();
+
+		{
+			Path path = mock(Path.class);
+			lfs.listStatus(path);
+			verify(fs).listStatus(path);
+		}
+
+		{
+			Path path = mock(Path.class);
+			lfs.exists(path);
+			verify(fs).exists(path);
+		}
+
+		{
+			Path path = mock(Path.class);
+			boolean recursive = rnd.nextBoolean();
+			lfs.delete(path, recursive);
+			verify(fs).delete(path, recursive);
+		}
+
+		{
+			Path path = mock(Path.class);
+			lfs.mkdirs(path);
+			verify(fs).mkdirs(path);
+		}
+
+		{
+			Path path = mock(Path.class);
+			boolean overwrite = rnd.nextBoolean();
+			int bufferSize = rnd.nextInt();
+			short replication = (short) rnd.nextInt();
+			long blockSize = rnd.nextInt();
+
+			lfs.create(path, overwrite, bufferSize, replication, blockSize);
+			verify(fs).create(path, overwrite, bufferSize, replication, blockSize);
+		}
+
+		{
+			Path path = mock(Path.class);
+			WriteMode mode = rnd.nextBoolean() ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE;
+			lfs.create(path, mode);
+			verify(fs).create(path, mode);
+		}
+
+		{
+			Path path1 = mock(Path.class);
+			Path path2 = mock(Path.class);
+			lfs.rename(path1, path2);
+			verify(fs).rename(path1, path2);
+		}
+
+		{
+			FileSystemKind kind = rnd.nextBoolean() ? FileSystemKind.FILE_SYSTEM : FileSystemKind.OBJECT_STORE;
+			when(fs.getKind()).thenReturn(kind);
+			assertEquals(kind, lfs.getKind());
+			verify(fs).getKind();
+		}
+	}
+
+	@Test
+	public void testDelegateOutStreamMethods() throws IOException {
+
+		// mock the output stream
+		final FSDataOutputStream mockOut = mock(FSDataOutputStream.class);
+		final long outPos = 46651L;
+		when(mockOut.getPos()).thenReturn(outPos);
+
+		final FileSystem fs = mock(FileSystem.class);
+		when(fs.create(any(Path.class), any(WriteMode.class))).thenReturn(mockOut);
+
+		final LimitedConnectionsFileSystem lfs = new LimitedConnectionsFileSystem(fs, 100);
+		final FSDataOutputStream out = lfs.create(mock(Path.class), WriteMode.OVERWRITE);
+
+		// validate the output stream
+
+		out.write(77);
+		verify(mockOut).write(77);
+
+		{
+			byte[] bytes = new byte[1786];
+			out.write(bytes, 100, 111);
+			verify(mockOut).write(bytes, 100, 111);
+		}
+
+		assertEquals(outPos, out.getPos());
+
+		out.flush();
+		verify(mockOut).flush();
+
+		out.sync();
+		verify(mockOut).sync();
+
+		out.close();
+		verify(mockOut).close();
+	}
+
+	@Test
+	public void testDelegateInStreamMethods() throws IOException {
+		// mock the input stream
+		final FSDataInputStream mockIn = mock(FSDataInputStream.class);
+		final int value = 93;
+		final int bytesRead = 11;
+		final long inPos = 93;
+		final int available = 17;
+		final boolean markSupported = true;
+		when(mockIn.read()).thenReturn(value);
+		when(mockIn.read(any(byte[].class), anyInt(), anyInt())).thenReturn(11);
+		when(mockIn.getPos()).thenReturn(inPos);
+		when(mockIn.available()).thenReturn(available);
+		when(mockIn.markSupported()).thenReturn(markSupported);
+
+		final FileSystem fs = mock(FileSystem.class);
+		when(fs.open(any(Path.class))).thenReturn(mockIn);
+
+		final LimitedConnectionsFileSystem lfs = new LimitedConnectionsFileSystem(fs, 100);
+		final FSDataInputStream in = lfs.open(mock(Path.class));
+
+		// validate the input stream
+
+		assertEquals(value, in.read());
+		assertEquals(bytesRead, in.read(new byte[11], 2, 5));
+
+		assertEquals(inPos, in.getPos());
+
+		in.seek(17876);
+		verify(mockIn).seek(17876);
+
+		assertEquals(available, in.available());
+
+		assertEquals(markSupported, in.markSupported());
+
+		in.mark(9876);
+		verify(mockIn).mark(9876);
+
+		in.close();
+		verify(mockIn).close();
+	}
+}