You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/06 17:54:05 UTC

[3/3] flink git commit: [FLINK-7768] [core] Load File Systems via Java Service abstraction

[FLINK-7768] [core] Load File Systems via Java Service abstraction

This changes the discovery mechanism of file from static class name configurations
to a service mechanism (META-INF/services).

As part of that, it factors HDFS and MapR FS implementations into separate modules.

With this change, users can add new filesystem implementations and make them available
by simply adding them to the class path.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77e3701c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77e3701c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77e3701c

Branch: refs/heads/master
Commit: 77e3701ca1f8bfab33a07f11992955eb131126c3
Parents: bad3df5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 5 18:12:21 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 6 17:45:13 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    | 125 +++++-
 .../apache/flink/core/fs/FileSystemFactory.java |   5 +
 .../flink/core/fs/UnsupportedSchemeFactory.java |  72 ++++
 .../HadoopFileSystemFactoryLoader.java          |  81 ----
 .../fs/factories/LocalFileSystemFactory.java    |  44 ---
 .../flink/core/fs/factories/MapRFsFactory.java  |  75 ----
 .../fs/factories/UnsupportedSchemeFactory.java  |  65 ----
 .../core/fs/local/LocalFileSystemFactory.java   |  49 +++
 .../io/DelimitedInputFormatSamplingTest.java    |   6 -
 .../apache/flink/testutils/TestFileSystem.java  |  31 +-
 .../org.apache.flink.core.fs.FileSystemFactory  |  16 +
 flink-dist/pom.xml                              |  15 +
 flink-filesystems/flink-hadoop-fs/pom.xml       |  62 +++
 .../runtime/fs/hdfs/HadoopBlockLocation.java    | 133 +++++++
 .../runtime/fs/hdfs/HadoopDataInputStream.java  | 139 +++++++
 .../runtime/fs/hdfs/HadoopDataOutputStream.java |  78 ++++
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java |  86 +++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 171 +++++++++
 .../flink/runtime/fs/hdfs/HadoopFsFactory.java  | 186 +++++++++
 .../apache/flink/runtime/util/HadoopUtils.java  | 123 ++++++
 .../fs/hdfs/HadoopConfigLoadingTest.java        | 194 ++++++++++
 .../fs/hdfs/HadoopDataInputStreamTest.java      | 132 +++++++
 .../fs/hdfs/HadoopFreeFsFactoryTest.java        |  83 ++++
 .../flink/runtime/fs/hdfs/HadoopFreeTests.java  |  66 ++++
 .../runtime/fs/hdfs/HadoopFsFactoryTest.java    |  63 ++++
 .../src/test/resources/core-site.xml            |  29 ++
 .../src/test/resources/log4j-test.properties    |  27 ++
 flink-filesystems/flink-mapr-fs/pom.xml         |  77 ++++
 .../flink/runtime/fs/maprfs/MapRFileSystem.java | 175 +++++++++
 .../flink/runtime/fs/maprfs/MapRFsFactory.java  |  75 ++++
 .../org.apache.flink.core.fs.FileSystemFactory  |  16 +
 .../runtime/fs/maprfs/FileSystemAccessTest.java |  42 +++
 .../flink/runtime/fs/maprfs/MapRFreeTests.java  |  74 ++++
 .../runtime/fs/maprfs/MapRFsFactoryTest.java    | 116 ++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 flink-filesystems/pom.xml                       |  43 +++
 flink-runtime/pom.xml                           |  11 +
 .../runtime/fs/hdfs/HadoopBlockLocation.java    | 133 -------
 .../runtime/fs/hdfs/HadoopDataInputStream.java  | 139 -------
 .../runtime/fs/hdfs/HadoopDataOutputStream.java |  78 ----
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java |  86 -----
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 171 ---------
 .../flink/runtime/fs/hdfs/HadoopFsFactory.java  | 167 --------
 .../flink/runtime/fs/maprfs/MapRFileSystem.java | 377 -------------------
 .../apache/flink/runtime/util/HadoopUtils.java  | 121 ------
 .../fs/hdfs/HadoopDataInputStreamTest.java      | 131 -------
 pom.xml                                         |   1 +
 tools/travis_mvn_watchdog.sh                    |  17 +
 48 files changed, 2530 insertions(+), 1703 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 6c18735..a6c9b50 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -30,10 +30,12 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.factories.HadoopFileSystemFactoryLoader;
-import org.apache.flink.core.fs.factories.MapRFsFactory;
 import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.core.fs.factories.LocalFileSystemFactory;
+import org.apache.flink.core.fs.local.LocalFileSystemFactory;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -42,7 +44,8 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
-import java.util.Map;
+import java.util.Iterator;
+import java.util.ServiceLoader;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -198,6 +201,9 @@ public abstract class FileSystem {
 
 	// ------------------------------------------------------------------------
 
+	/** Logger for all FileSystem work */
+	private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
+
 	/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
 	 * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races. */
 	private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
@@ -206,27 +212,23 @@ public abstract class FileSystem {
 	private static final ReentrantLock LOCK = new ReentrantLock(true);
 
 	/** Cache for file systems, by scheme + authority. */
-	private static final Map<FSKey, FileSystem> CACHE = new HashMap<>();
+	private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();
 
 	/** Mapping of file system schemes to  the corresponding implementation factories. */
-	private static final Map<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();
+	private static final HashMap<String, FileSystemFactory> FS_FACTORIES = loadFileSystems();
 
 	/** The default factory that is used when no scheme matches. */
-	private static final FileSystemFactory FALLBACK_FACTORY = HadoopFileSystemFactoryLoader.loadFactory();
+	private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory();
 
 	/** The default filesystem scheme to be used, configured during process-wide initialization.
 	 * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
 	private static URI DEFAULT_SCHEME;
+	
 
 	// ------------------------------------------------------------------------
 	//  Initialization
 	// ------------------------------------------------------------------------
 
-	static {
-		FS_FACTORIES.put("file", new LocalFileSystemFactory());
-		FS_FACTORIES.put("maprfs", new MapRFsFactory());
-	}
-
 	/**
 	 * Initializes the shared file system settings. 
 	 *
@@ -892,6 +894,105 @@ public abstract class FileSystem {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Loads the factories for the file systems directly supported by Flink.
+	 * Aside from the {@link LocalFileSystem}, these file systems are loaded
+	 * via Java's service framework.
+	 *
+	 * @return A map from the file system scheme to corresponding file system factory. 
+	 */
+	private static HashMap<String, FileSystemFactory> loadFileSystems() {
+		final HashMap<String, FileSystemFactory> map = new HashMap<>();
+
+		// by default, we always have the the local file system factory
+		map.put("file", new LocalFileSystemFactory());
+
+		LOG.debug("Loading extension file systems via services");
+
+		try {
+			ServiceLoader<FileSystemFactory> serviceLoader = ServiceLoader.load(FileSystemFactory.class);
+			Iterator<FileSystemFactory> iter = serviceLoader.iterator();
+
+			// we explicitly use an iterator here (rather than for-each) because that way
+			// we can catch errors in individual service instantiations
+
+			//noinspection WhileLoopReplaceableByForEach
+			while (iter.hasNext()) {
+				try {
+					FileSystemFactory factory = iter.next();
+					String scheme = factory.getScheme();
+					map.put(scheme, factory);
+					LOG.debug("Added file system {}:{}", scheme, factory.getClass().getName());
+				}
+				catch (Throwable t) {
+					// catching Throwable here to handle various forms of class loading
+					// and initialization errors
+					ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+					LOG.error("Failed to load a file systems via services", t);
+				}
+			}
+		}
+		catch (Throwable t) {
+			// catching Throwable here to handle various forms of class loading
+			// and initialization errors
+			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+			LOG.error("Failed to load additional file systems via services", t);
+		}
+
+		return map;
+	}
+	
+	/**
+	 * Utility loader for the Hadoop file system factory.
+	 * We treat the Hadoop FS factory in a special way, because we use it as a catch
+	 * all for file systems schemes not supported directly in Flink.
+	 *
+	 * <p>This method does a set of eager checks for availability of certain classes, to
+	 * be able to give better error messages.
+	 */
+	private static FileSystemFactory loadHadoopFsFactory() {
+		final ClassLoader cl = FileSystem.class.getClassLoader();
+
+		// first, see if the Flink runtime classes are available
+		final Class<? extends FileSystemFactory> factoryClass;
+		try {
+			factoryClass = Class.forName("org.apache.flink.runtime.fs.hdfs.HadoopFsFactory", false, cl).asSubclass(FileSystemFactory.class);
+		}
+		catch (ClassNotFoundException e) {
+			LOG.info("No Flink runtime dependency present. " + 
+					"The extended set of supported File Systems via Hadoop is not available.");
+			return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies.");
+		}
+		catch (Exception | LinkageError e) {
+			LOG.warn("Flink's Hadoop file system factory could not be loaded", e);
+			return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e);
+		}
+
+		// check (for eager and better exception messages) if the Hadoop classes are available here
+		try {
+			Class.forName("org.apache.hadoop.conf.Configuration", false, cl);
+			Class.forName("org.apache.hadoop.fs.FileSystem", false, cl);
+		}
+		catch (ClassNotFoundException e) {
+			LOG.info("Hadoop is not in the classpath/dependencies. " +
+					"The extended set of supported File Systems via Hadoop is not available.");
+			return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies.");
+		}
+
+		// Create the factory.
+		try {
+			return factoryClass.newInstance();
+		}
+		catch (Exception | LinkageError e) {
+			LOG.warn("Flink's Hadoop file system factory could not be created", e);
+			return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be created", e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
 	/**
 	 * An identifier of a file system, via its scheme and its authority.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
index 503f21f..982da35 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
@@ -34,6 +34,11 @@ import java.net.URI;
 public interface FileSystemFactory {
 
 	/**
+	 * Gets the scheme of the file system created by this factory.
+	 */
+	String getScheme();
+
+	/**
 	 * Applies the given configuration to this factory. All future file system
 	 * instantiations via {@link #create(URI)} should take the configuration into
 	 * account.

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
new file mode 100644
index 0000000..234b49f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system factory to throw an UnsupportedFileSystemSchemeException when called.
+ */
+@Internal
+class UnsupportedSchemeFactory implements FileSystemFactory {
+
+	private final String exceptionMessage;
+
+	@Nullable
+	private final Throwable exceptionCause;
+
+	public UnsupportedSchemeFactory(String exceptionMessage) {
+		this(exceptionMessage, null);
+	}
+
+	public UnsupportedSchemeFactory(String exceptionMessage, @Nullable Throwable exceptionCause) {
+		this.exceptionMessage = checkNotNull(exceptionMessage);
+		this.exceptionCause = exceptionCause;
+	}
+
+	@Override
+	public String getScheme() {
+		return "n/a";
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		// nothing to do here
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		if (exceptionCause == null) {
+			throw new UnsupportedFileSystemSchemeException(exceptionMessage);
+		}
+		else {
+			throw new UnsupportedFileSystemSchemeException(exceptionMessage, exceptionCause);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java b/flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
deleted file mode 100644
index ed584ef..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs.factories;
-
-import org.apache.flink.core.fs.FileSystemFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A utility class to check and reflectively load the Hadoop file system factory.
- */
-public class HadoopFileSystemFactoryLoader {
-
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopFileSystemFactoryLoader.class);
-
-	private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";
-
-	private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration";
-
-	private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem";
-
-
-	/**
-	 * Loads the FileSystemFactory for the Hadoop-backed file systems.
-	 */
-	public static FileSystemFactory loadFactory() {
-		final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader();
-
-		// first, see if the Flink runtime classes are available
-		final Class<? extends FileSystemFactory> factoryClass;
-		try {
-			factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class);
-		}
-		catch (ClassNotFoundException e) {
-			LOG.info("No Flink runtime dependency present - the extended set of supported File Systems " +
-					"via Hadoop is not available.");
-			return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies.");
-		}
-		catch (Exception | LinkageError e) {
-			LOG.warn("Flink's Hadoop file system factory could not be loaded", e);
-			return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e);
-		}
-
-		// check (for eager and better exception messages) if the Hadoop classes are available here
-		try {
-			Class.forName(HADOOP_CONFIG_CLASS, false, cl);
-			Class.forName(HADOOP_FS_CLASS, false, cl);
-		}
-		catch (ClassNotFoundException e) {
-			LOG.info("Hadoop is not in the classpath/dependencies - the extended set of supported File Systems " +
-					"via Hadoop is not available.");
-			return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies.");
-		}
-
-		// Create the factory.
-		try {
-			return factoryClass.newInstance();
-		}
-		catch (Exception | LinkageError e) {
-			LOG.warn("Flink's Hadoop file system factory could not be created", e);
-			return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be created", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/factories/LocalFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/factories/LocalFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/factories/LocalFileSystemFactory.java
deleted file mode 100644
index fc04de5..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/factories/LocalFileSystemFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.core.fs.local.LocalFileSystem;
-
-import java.net.URI;
-
-/**
- * A factory for the {@link LocalFileSystem}.
- */
-@PublicEvolving
-public class LocalFileSystemFactory implements FileSystemFactory {
-
-	@Override
-	public void configure(Configuration config) {
-		// the local file system takes no configuration, so nothing to do here
-	}
-
-	@Override
-	public FileSystem create(URI fsUri) {
-		return LocalFileSystem.getSharedInstance();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/factories/MapRFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/factories/MapRFsFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/factories/MapRFsFactory.java
deleted file mode 100644
index 271e5bc..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/factories/MapRFsFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs.factories;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URI;
-
-/**
- * A factory for the MapR file system.
- * 
- * <p>This factory tries to reflectively instantiate the MapR file system. It can only be
- * used when the MapR FS libraries are in the classpath.
- */
-public class MapRFsFactory implements FileSystemFactory {
-
-	private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";
-
-	@Override
-	public void configure(Configuration config) {
-		// nothing to configure based on the configuration here
-	}
-
-	@Override
-	public FileSystem create(URI fsUri) throws IOException {
-		try {
-			Class<? extends FileSystem> fsClass = Class.forName(
-					MAPR_FILESYSTEM_CLASS, false, getClass().getClassLoader()).asSubclass(FileSystem.class);
-
-			Constructor<? extends FileSystem> constructor = fsClass.getConstructor(URI.class);
-
-			try {
-				return constructor.newInstance(fsUri);
-			}
-			catch (InvocationTargetException e) {
-				throw e.getTargetException();
-			}
-		}
-		catch (ClassNotFoundException e) {
-			throw new IOException("Could not load MapR file system class '" + MAPR_FILESYSTEM_CLASS + 
-					"\'. Please make sure the Flink runtime classes are part of the classpath or dependencies.", e);
-		}
-		catch (LinkageError e) {
-			throw new IOException("Some of the MapR FS or required Hadoop classes seem to be missing or incompatible. " 
-					+ "Please check that a compatible version of the MapR Hadoop libraries is in the classpath.", e);
-		}
-		catch (IOException e) {
-			throw e;
-		}
-		catch (Throwable t) {
-			throw new IOException("Could not instantiate MapR file system class '" + MAPR_FILESYSTEM_CLASS + "'.", t);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/factories/UnsupportedSchemeFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/factories/UnsupportedSchemeFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/factories/UnsupportedSchemeFactory.java
deleted file mode 100644
index 8464b63..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/factories/UnsupportedSchemeFactory.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs.factories;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.net.URI;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A file system factory to throw an UnsupportedFileSystemSchemeException when called.
- */
-public class UnsupportedSchemeFactory implements FileSystemFactory {
-
-	private final String exceptionMessage;
-
-	@Nullable
-	private final Throwable exceptionCause;
-
-	public UnsupportedSchemeFactory(String exceptionMessage) {
-		this(exceptionMessage, null);
-	}
-
-	public UnsupportedSchemeFactory(String exceptionMessage, @Nullable Throwable exceptionCause) {
-		this.exceptionMessage = checkNotNull(exceptionMessage);
-		this.exceptionCause = exceptionCause;
-	}
-
-	@Override
-	public void configure(Configuration config) {
-		// nothing to do here
-	}
-
-	@Override
-	public FileSystem create(URI fsUri) throws IOException {
-		if (exceptionCause == null) {
-			throw new UnsupportedFileSystemSchemeException(exceptionMessage);
-		}
-		else {
-			throw new UnsupportedFileSystemSchemeException(exceptionMessage, exceptionCause);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
new file mode 100644
index 0000000..7cbc2bd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import java.net.URI;
+
+/**
+ * A factory for the {@link LocalFileSystem}.
+ */
+@PublicEvolving
+public class LocalFileSystemFactory implements FileSystemFactory {
+
+	@Override
+	public String getScheme() {
+		return LocalFileSystem.getLocalFsURI().getScheme();
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		// the local file system takes no configuration, so nothing to do here
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) {
+		return LocalFileSystem.getSharedInstance();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
index be73798..01f8680 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
@@ -76,12 +76,6 @@ public class DelimitedInputFormatSamplingTest {
 	@BeforeClass
 	public static void initialize() {
 		try {
-			TestFileSystem.registerTestFileSysten();
-		} catch (Throwable t) {
-			Assert.fail("Could not setup the mock test filesystem.");
-		}
-		
-		try {
 			// make sure we do 4 samples
 			CONFIG = TestConfigUtils.loadGlobalConf(
 				new String[] { ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
index 1e5a7b0..b799152 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
@@ -19,9 +19,7 @@
 package org.apache.flink.testutils;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.URI;
-import java.util.Map;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -32,8 +30,14 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileStatus;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 
+/**
+ * A test file system. This also has a service entry in the test
+ * resources, to be loaded during tests.
+ */
 public class TestFileSystem extends LocalFileSystem {
-	
+
+	public static final String SCHEME = "test";
+
 	private static int streamOpenCounter;
 	
 	public static int getNumtimeStreamOpened() {
@@ -74,24 +78,17 @@ public class TestFileSystem extends LocalFileSystem {
 
 	@Override
 	public URI getUri() {
-		return URI.create("test:///");
-	}
-
-	public static void registerTestFileSysten() throws Exception {
-		Class<FileSystem> fsClass = FileSystem.class;
-		Field dirField = fsClass.getDeclaredField("FS_FACTORIES");
-
-		dirField.setAccessible(true);
-		@SuppressWarnings("unchecked")
-		Map<String, FileSystemFactory> map = (Map<String, FileSystemFactory>) dirField.get(null);
-		dirField.setAccessible(false);
-
-		map.put("test", new TestFileSystemFactory());
+		return URI.create(SCHEME + ":///");
 	}
 
 	// ------------------------------------------------------------------------
 
-	private static final class TestFileSystemFactory implements FileSystemFactory {
+	public static final class TestFileSystemFactory implements FileSystemFactory {
+
+		@Override
+		public String getScheme() {
+			return SCHEME;
+		}
 
 		@Override
 		public void configure(Configuration config) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..5a3a31d
--- /dev/null
+++ b/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.testutils.TestFileSystem$TestFileSystemFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index ffad448..8120cd8 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -205,6 +205,21 @@ under the License.
 			</exclusions>
 		</dependency>
 
+		<!-- Default file system support. The Hadoop and MapR dependencies -->
+		<!--       are optional, so not being added to the dist jar        -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-fs</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-mapr-fs</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 		<!-- Concrete logging framework - we add this only here (and not in the 
 			root POM) to not tie the projects to one specific framework and make
 			it easier for users to swap logging frameworks -->

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/pom.xml b/flink-filesystems/flink-hadoop-fs/pom.xml
new file mode 100644
index 0000000..7cb4b4c
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-filesystems</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-hadoop-fs</artifactId>
+	<name>flink-hadoop-fs</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<!-- Hadoop as an optional dependency, so we can hard depend on this without -->
+		<!-- pulling in Hadoop by default -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-hadoop2</artifactId>
+			<version>${project.version}</version>
+			<optional>true</optional>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
new file mode 100644
index 0000000..1484c95
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.BlockLocation;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Implementation of the {@link BlockLocation} interface for the
+ * Hadoop Distributed File System.
+ */
+public final class HadoopBlockLocation implements BlockLocation {
+
+	/**
+	 * Specifies the character separating the hostname from the domain name.
+	 */
+	private static final char DOMAIN_SEPARATOR = '.';
+
+	/**
+	 * Regular expression for an IPv4 address.
+	 */
+	private static final Pattern IPV4_PATTERN = Pattern.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+$");
+
+	/**
+	 * The original Hadoop block location object.
+	 */
+	private final org.apache.hadoop.fs.BlockLocation blockLocation;
+
+	/**
+	 * Stores the hostnames without the domain suffix.
+	 */
+	private String[] hostnames;
+
+	/**
+	 * Creates a new block location.
+	 *
+	 * @param blockLocation
+	 *        the original HDFS block location
+	 */
+	public HadoopBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
+
+		this.blockLocation = blockLocation;
+	}
+
+	@Override
+	public String[] getHosts() throws IOException {
+
+		/**
+		 * Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
+		 * the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
+		 * depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
+		 * sure it does not contain the domain suffix.
+		 */
+		if (this.hostnames == null) {
+
+			final String[] hadoopHostnames = blockLocation.getHosts();
+			this.hostnames = new String[hadoopHostnames.length];
+
+			for (int i = 0; i < hadoopHostnames.length; ++i) {
+				this.hostnames[i] = stripHostname(hadoopHostnames[i]);
+			}
+		}
+
+		return this.hostnames;
+	}
+
+	/**
+	 * Looks for a domain suffix in a FQDN and strips it if present.
+	 *
+	 * @param originalHostname
+	 *        the original hostname, possibly an FQDN
+	 * @return the stripped hostname without the domain suffix
+	 */
+	private static String stripHostname(final String originalHostname) {
+
+		// Check if the hostname domains the domain separator character
+		final int index = originalHostname.indexOf(DOMAIN_SEPARATOR);
+		if (index == -1) {
+			return originalHostname;
+		}
+
+		// Make sure we are not stripping an IPv4 address
+		final Matcher matcher = IPV4_PATTERN.matcher(originalHostname);
+		if (matcher.matches()) {
+			return originalHostname;
+		}
+
+		if (index == 0) {
+			throw new IllegalStateException("Hostname " + originalHostname + " starts with a " + DOMAIN_SEPARATOR);
+		}
+
+		return originalHostname.substring(0, index);
+	}
+
+	@Override
+	public long getLength() {
+
+		return this.blockLocation.getLength();
+	}
+
+	@Override
+	public long getOffset() {
+
+		return this.blockLocation.getOffset();
+	}
+
+	@Override
+	public int compareTo(final BlockLocation o) {
+
+		final long diff = getOffset() - o.getOffset();
+
+		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
new file mode 100644
index 0000000..da50c4c
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Concrete implementation of the {@link FSDataInputStream} for Hadoop's input streams.
+ * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
+ */
+public final class HadoopDataInputStream extends FSDataInputStream {
+
+	/**
+	 * Minimum amount of bytes to skip forward before we issue a seek instead of discarding read.
+	 *
+	 * <p>The current value is just a magic number. In the long run, this value could become configurable, but for now it
+	 * is a conservative, relatively small value that should bring safe improvements for small skips (e.g. in reading
+	 * meta data), that would hurt the most with frequent seeks.
+	 *
+	 * <p>The optimal value depends on the DFS implementation and configuration plus the underlying filesystem.
+	 * For now, this number is chosen "big enough" to provide improvements for smaller seeks, and "small enough" to
+	 * avoid disadvantages over real seeks. While the minimum should be the page size, a true optimum per system would
+	 * be the amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, seektime is not
+	 * constant and devices, OS, and DFS potentially also use read buffers and read-ahead.
+	 */
+	public static final int MIN_SKIP_BYTES = 1024 * 1024;
+
+	/** The internal stream. */
+	private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
+
+	/**
+	 * Creates a new data input stream from the given Hadoop input stream.
+	 *
+	 * @param fsDataInputStream The Hadoop input stream
+	 */
+	public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
+		this.fsDataInputStream = checkNotNull(fsDataInputStream);
+	}
+
+	@Override
+	public void seek(long seekPos) throws IOException {
+		// We do some optimizations to avoid that some implementations of distributed FS perform
+		// expensive seeks when they are actually not needed.
+		long delta = seekPos - getPos();
+
+		if (delta > 0L && delta <= MIN_SKIP_BYTES) {
+			// Instead of a small forward seek, we skip over the gap
+			skipFully(delta);
+		} else if (delta != 0L) {
+			// For larger gaps and backward seeks, we do a real seek
+			forceSeek(seekPos);
+		} // Do nothing if delta is zero.
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return fsDataInputStream.getPos();
+	}
+
+	@Override
+	public int read() throws IOException {
+		return fsDataInputStream.read();
+	}
+
+	@Override
+	public void close() throws IOException {
+		fsDataInputStream.close();
+	}
+
+	@Override
+	public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
+		return fsDataInputStream.read(buffer, offset, length);
+	}
+
+	@Override
+	public int available() throws IOException {
+		return fsDataInputStream.available();
+	}
+
+	@Override
+	public long skip(long n) throws IOException {
+		return fsDataInputStream.skip(n);
+	}
+
+	/**
+	 * Gets the wrapped Hadoop input stream.
+	 * @return The wrapped Hadoop input stream.
+	 */
+	public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream() {
+		return fsDataInputStream;
+	}
+
+	/**
+	 * Positions the stream to the given location. In contrast to {@link #seek(long)}, this method will
+	 * always issue a "seek" command to the dfs and may not replace it by {@link #skip(long)} for small seeks.
+	 *
+	 * <p>Notice that the underlying DFS implementation can still decide to do skip instead of seek.
+	 *
+	 * @param seekPos the position to seek to.
+	 * @throws IOException
+	 */
+	public void forceSeek(long seekPos) throws IOException {
+		fsDataInputStream.seek(seekPos);
+	}
+
+	/**
+	 * Skips over a given amount of bytes in the stream.
+	 *
+	 * @param bytes the number of bytes to skip.
+	 * @throws IOException
+	 */
+	public void skipFully(long bytes) throws IOException {
+		while (bytes > 0) {
+			bytes -= fsDataInputStream.skip(bytes);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
new file mode 100644
index 0000000..1b8d1a3
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+
+/**
+ * Concrete implementation of the {@link FSDataOutputStream} for Hadoop's input streams.
+ * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
+ */
+public class HadoopDataOutputStream extends FSDataOutputStream {
+
+	private final org.apache.hadoop.fs.FSDataOutputStream fdos;
+
+	public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos) {
+		if (fdos == null) {
+			throw new NullPointerException();
+		}
+		this.fdos = fdos;
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		fdos.write(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		fdos.write(b, off, len);
+	}
+
+	@Override
+	public void close() throws IOException {
+		fdos.close();
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return fdos.getPos();
+	}
+
+	@Override
+	public void flush() throws IOException {
+		fdos.hflush();
+	}
+
+	@Override
+	public void sync() throws IOException {
+		fdos.hsync();
+	}
+
+	/**
+	 * Gets the wrapped Hadoop output stream.
+	 * @return The wrapped Hadoop output stream.
+	 */
+	public org.apache.hadoop.fs.FSDataOutputStream getHadoopOutputStream() {
+		return fdos;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
new file mode 100644
index 0000000..17bb334
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+
+/**
+ * Concrete implementation of the {@link FileStatus} interface for the
+ * Hadoop Distribution File System.
+ */
+public final class HadoopFileStatus implements FileStatus {
+
+	private org.apache.hadoop.fs.FileStatus fileStatus;
+
+	/**
+	 * Creates a new file status from a HDFS file status.
+	 *
+	 * @param fileStatus
+	 *        the HDFS file status
+	 */
+	public HadoopFileStatus(org.apache.hadoop.fs.FileStatus fileStatus) {
+		this.fileStatus = fileStatus;
+	}
+
+	@Override
+	public long getLen() {
+		return fileStatus.getLen();
+	}
+
+	@Override
+	public long getBlockSize() {
+		long blocksize = fileStatus.getBlockSize();
+		if (blocksize > fileStatus.getLen()) {
+			return fileStatus.getLen();
+		}
+
+		return blocksize;
+	}
+
+	@Override
+	public long getAccessTime() {
+		return fileStatus.getAccessTime();
+	}
+
+	@Override
+	public long getModificationTime() {
+		return fileStatus.getModificationTime();
+	}
+
+	@Override
+	public short getReplication() {
+		return fileStatus.getReplication();
+	}
+
+	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
+		return this.fileStatus;
+	}
+
+	@Override
+	public Path getPath() {
+		return new Path(fileStatus.getPath().toString());
+	}
+
+	@SuppressWarnings("deprecation")
+	@Override
+	public boolean isDir() {
+		return fileStatus.isDir();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
new file mode 100644
index 0000000..5970c9d
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link FileSystem} that wraps an {@link org.apache.hadoop.fs.FileSystem Hadoop File System}.
+ */
+public class HadoopFileSystem extends FileSystem {
+
+	/** The wrapped Hadoop File System. */
+	private final org.apache.hadoop.fs.FileSystem fs;
+
+	/**
+	 * Wraps the given Hadoop File System object as a Flink File System object.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 *
+	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+	 */
+	public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
+		this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+	}
+
+	/**
+	 * Gets the underlying Hadoop FileSystem.
+	 * @return The underlying Hadoop FileSystem.
+	 */
+	public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
+		return this.fs;
+	}
+
+	// ------------------------------------------------------------------------
+	//  file system methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Path getWorkingDirectory() {
+		return new Path(this.fs.getWorkingDirectory().toUri());
+	}
+
+	public Path getHomeDirectory() {
+		return new Path(this.fs.getHomeDirectory().toUri());
+	}
+
+	@Override
+	public URI getUri() {
+		return fs.getUri();
+	}
+
+	@Override
+	public FileStatus getFileStatus(final Path f) throws IOException {
+		org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
+		return new HadoopFileStatus(status);
+	}
+
+	@Override
+	public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
+			throws IOException {
+		if (!(file instanceof HadoopFileStatus)) {
+			throw new IOException("file is not an instance of DistributedFileStatus");
+		}
+
+		final HadoopFileStatus f = (HadoopFileStatus) file;
+
+		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
+			start, len);
+
+		// Wrap up HDFS specific block location objects
+		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
+		for (int i = 0; i < distBlkLocations.length; i++) {
+			distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
+		}
+
+		return distBlkLocations;
+	}
+
+	@Override
+	public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
+		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
+		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
+		return new HadoopDataInputStream(fdis);
+	}
+
+	@Override
+	public HadoopDataInputStream open(final Path f) throws IOException {
+		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
+		final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
+		return new HadoopDataInputStream(fdis);
+	}
+
+	@Override
+	@SuppressWarnings("deprecation")
+	public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
+			final short replication, final long blockSize) throws IOException {
+		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
+			new org.apache.hadoop.fs.Path(f.toString()), overwrite, bufferSize, replication, blockSize);
+		return new HadoopDataOutputStream(fdos);
+	}
+
+	@Override
+	public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
+		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
+			.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE);
+		return new HadoopDataOutputStream(fsDataOutputStream);
+	}
+
+	@Override
+	public boolean delete(final Path f, final boolean recursive) throws IOException {
+		return this.fs.delete(new org.apache.hadoop.fs.Path(f.toString()), recursive);
+	}
+
+	@Override
+	public FileStatus[] listStatus(final Path f) throws IOException {
+		final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(new org.apache.hadoop.fs.Path(f.toString()));
+		final FileStatus[] files = new FileStatus[hadoopFiles.length];
+
+		// Convert types
+		for (int i = 0; i < files.length; i++) {
+			files[i] = new HadoopFileStatus(hadoopFiles[i]);
+		}
+
+		return files;
+	}
+
+	@Override
+	public boolean mkdirs(final Path f) throws IOException {
+		return this.fs.mkdirs(new org.apache.hadoop.fs.Path(f.toString()));
+	}
+
+	@Override
+	public boolean rename(final Path src, final Path dst) throws IOException {
+		return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()),
+			new org.apache.hadoop.fs.Path(dst.toString()));
+	}
+
+	@SuppressWarnings("deprecation")
+	@Override
+	public long getDefaultBlockSize() {
+		return this.fs.getDefaultBlockSize();
+	}
+
+	@Override
+	public boolean isDistributedFS() {
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
new file mode 100644
index 0000000..50e64e1
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system factory for Hadoop-based file systems.
+ *
+ * <p>This factory calls Hadoop's mechanism to find a file system implementation for a given file
+ * system scheme (a {@link org.apache.hadoop.fs.FileSystem}) and wraps it as a Flink file system
+ * (a {@link org.apache.flink.core.fs.FileSystem}).
+ */
+public class HadoopFsFactory implements FileSystemFactory {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopFsFactory.class);
+
+	/** Flink's configuration object. */
+	private Configuration flinkConfig;
+
+	/** Hadoop's configuration for the file systems. */
+	private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+	@Override
+	public String getScheme() {
+		// the hadoop factory creates various schemes
+		return "*";
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		flinkConfig = config;
+		hadoopConfig = null; // reset the Hadoop Config
+	}
+
+	@Override
+	public HadoopFileSystem create(URI fsUri) throws IOException {
+		checkNotNull(fsUri, "fsUri");
+
+		final String scheme = fsUri.getScheme();
+		checkArgument(scheme != null, "file system has null scheme");
+
+		// from here on, we need to handle errors due to missing optional
+		// dependency classes
+		try {
+			// -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
+
+			final org.apache.hadoop.conf.Configuration hadoopConfig;
+			if (this.hadoopConfig != null) {
+				hadoopConfig = this.hadoopConfig;
+			}
+			else if (flinkConfig != null) {
+				hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
+				this.hadoopConfig = hadoopConfig;
+			}
+			else {
+				LOG.warn("Hadoop configuration has not been explicitly initialized prior to loading a Hadoop file system."
+						+ " Using configuration from the classpath.");
+
+				hadoopConfig = new org.apache.hadoop.conf.Configuration();
+			}
+
+			// -- (2) get the Hadoop file system class for that scheme
+
+			final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
+			try {
+				fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig);
+			}
+			catch (IOException e) {
+				throw new UnsupportedFileSystemSchemeException(
+						"Hadoop File System abstraction does not support scheme '" + scheme + "'. " +
+								"Either no file system implementation exists for that scheme, " +
+								"or the relevant classes are missing from the classpath.", e);
+			}
+
+			// -- (3) instantiate the Hadoop file system
+
+			LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName());
+
+			final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();
+
+			// -- (4) create the proper URI to initialize the file system
+
+			final URI initUri;
+			if (fsUri.getAuthority() != null) {
+				initUri = fsUri;
+			}
+			else {
+				LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)");
+
+				String configEntry = hadoopConfig.get("fs.defaultFS", null);
+				if (configEntry == null) {
+					// fs.default.name deprecated as of hadoop 2.2.0 - see
+					// http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
+					configEntry = hadoopConfig.get("fs.default.name", null);
+				}
+
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
+				}
+
+				if (configEntry == null) {
+					throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
+							"Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').");
+				}
+				else {
+					try {
+						initUri = URI.create(configEntry);
+					}
+					catch (IllegalArgumentException e) {
+						throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
+								"The configuration contains an invalid file system default name " +
+								"('fs.default.name' or 'fs.defaultFS'): " + configEntry);
+					}
+
+					if (initUri.getAuthority() == null) {
+						throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
+								"Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " +
+								"contains no valid authority component (like hdfs namenode, S3 host, etc)");
+					}
+				}
+			}
+
+			// -- (5) configure the Hadoop file system
+
+			try {
+				hadoopFs.initialize(initUri, hadoopConfig);
+			}
+			catch (UnknownHostException e) {
+				String message = "The Hadoop file system's authority (" + initUri.getAuthority() +
+						"), specified by either the file URI or the configuration, cannot be resolved.";
+
+				throw new IOException(message, e);
+			}
+
+			// all good, return the file system
+			return new HadoopFileSystem(hadoopFs);
+		}
+		catch (ReflectiveOperationException | LinkageError e) {
+			throw new UnsupportedFileSystemSchemeException("Cannot support file system for '" + fsUri.getScheme() +
+					"' via Hadoop, because Hadoop is not in the classpath, or some classes " +
+					"are missing from the classpath.", e);
+		}
+		catch (IOException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new IOException("Cannot instantiate file system for URI: " + fsUri, e);
+		}
+	}
+
+	private static String getMissingAuthorityErrorPrefix(URI fsURI) {
+		return "The given file system URI (" + fsURI.toString() + ") did not describe the authority " +
+				"(like for example HDFS NameNode address/port or S3 host). " +
+				"The attempt to use a configured default authority failed: ";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
new file mode 100644
index 0000000..8bfcb5c
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collection;
+
+/**
+ * Utility class for working with Hadoop-related classes. This should only be used if Hadoop
+ * is on the classpath.
+ */
+public class HadoopUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
+
+	private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+
+	public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
+
+		Configuration result = new Configuration();
+		boolean foundHadoopConfiguration = false;
+
+		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
+		// the hdfs configuration
+		// Try to load HDFS configuration from Hadoop's own configuration files
+		// 1. approach: Flink configuration
+		final String hdfsDefaultPath =
+			flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
+
+		if (hdfsDefaultPath != null) {
+			result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
+			LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath);
+			foundHadoopConfiguration = true;
+		} else {
+			LOG.debug("Cannot find hdfs-default configuration-file path in Flink config.");
+		}
+
+		final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+		if (hdfsSitePath != null) {
+			result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
+			LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath);
+			foundHadoopConfiguration = true;
+		} else {
+			LOG.debug("Cannot find hdfs-site configuration-file path in Flink config.");
+		}
+
+		// 2. Approach environment variables
+		String[] possibleHadoopConfPaths = new String[4];
+		possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
+
+		final String hadoopHome = System.getenv("HADOOP_HOME");
+		if (hadoopHome != null) {
+			possibleHadoopConfPaths[2] = hadoopHome + "/conf";
+			possibleHadoopConfPaths[3] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
+		}
+
+		for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+			if (possibleHadoopConfPath != null) {
+				if (new File(possibleHadoopConfPath).exists()) {
+					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
+						result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
+						LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
+						foundHadoopConfiguration = true;
+					}
+					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
+						result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
+						LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
+						foundHadoopConfiguration = true;
+					}
+				}
+			}
+		}
+
+		if (!foundHadoopConfiguration) {
+			LOG.debug("Could not find Hadoop configuration via any of the supported methods " +
+				"(Flink configuration, environment variables).");
+		}
+
+		return result;
+	}
+
+	/**
+	 * Indicates whether the current user has an HDFS delegation token.
+	 */
+	public static boolean hasHDFSDelegationToken() throws Exception {
+		UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+		for (Token<? extends TokenIdentifier> token : usrTok) {
+			if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
+				return true;
+			}
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java
new file mode 100644
index 0000000..bb2c492
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that validate the loading of the Hadoop configuration, relative to
+ * entries in the Flink configuration and the environment variables.
+ */
+public class HadoopConfigLoadingTest {
+
+	private static final String IN_CP_CONFIG_KEY = "cp_conf_key";
+	private static final String IN_CP_CONFIG_VALUE = "oompf!";
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void loadFromClasspathByDefault() {
+		org.apache.hadoop.conf.Configuration hadoopConf =
+				HadoopUtils.getHadoopConfiguration(new Configuration());
+
+		assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
+	}
+
+	@Test
+	public void loadFromLegacyConfigEntries() throws Exception {
+		final String k1 = "shipmate";
+		final String v1 = "smooth sailing";
+
+		final String k2 = "pirate";
+		final String v2 = "Arrg, yer scurvy dog!";
+
+		final File file1 = tempFolder.newFile("core-site.xml");
+		final File file2 = tempFolder.newFile("hdfs-site.xml");
+
+		printConfig(file1, k1, v1);
+		printConfig(file2, k2, v2);
+
+		final Configuration cfg = new Configuration();
+		cfg.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, file1.getAbsolutePath());
+		cfg.setString(ConfigConstants.HDFS_SITE_CONFIG, file2.getAbsolutePath());
+
+		org.apache.hadoop.conf.Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(cfg);
+
+		// contains extra entries
+		assertEquals(v1, hadoopConf.get(k1, null));
+		assertEquals(v2, hadoopConf.get(k2, null));
+
+		// also contains classpath defaults
+		assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
+	}
+
+	@Test
+	public void loadFromHadoopConfEntry() throws Exception {
+		final String k1 = "singing?";
+		final String v1 = "rain!";
+
+		final String k2 = "dancing?";
+		final String v2 = "shower!";
+
+		final File confDir = tempFolder.newFolder();
+
+		final File file1 = new File(confDir, "core-site.xml");
+		final File file2 = new File(confDir, "hdfs-site.xml");
+
+		printConfig(file1, k1, v1);
+		printConfig(file2, k2, v2);
+
+		final Configuration cfg = new Configuration();
+		cfg.setString(ConfigConstants.PATH_HADOOP_CONFIG, confDir.getAbsolutePath());
+
+		org.apache.hadoop.conf.Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(cfg);
+
+		// contains extra entries
+		assertEquals(v1, hadoopConf.get(k1, null));
+		assertEquals(v2, hadoopConf.get(k2, null));
+
+		// also contains classpath defaults
+		assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
+	}
+
+	@Test
+	public void loadFromEnvVariables() throws Exception {
+		final String k1 = "where?";
+		final String v1 = "I'm on a boat";
+		final String k2 = "when?";
+		final String v2 = "midnight";
+		final String k3 = "why?";
+		final String v3 = "what do you think?";
+		final String k4 = "which way?";
+		final String v4 = "south, always south...";
+		final String k5 = "how long?";
+		final String v5 = "an eternity";
+		final String k6 = "for real?";
+		final String v6 = "quite so...";
+
+		final File hadoopConfDir = tempFolder.newFolder();
+
+		final File hadoopHome = tempFolder.newFolder();
+
+		final File hadoopHomeConf = new File(hadoopHome, "conf");
+		final File hadoopHomeEtc = new File(hadoopHome, "etc/hadoop");
+
+		assertTrue(hadoopHomeConf.mkdirs());
+		assertTrue(hadoopHomeEtc.mkdirs());
+
+		final File file1 = new File(hadoopConfDir, "core-site.xml");
+		final File file2 = new File(hadoopConfDir, "hdfs-site.xml");
+		final File file3 = new File(hadoopHomeConf, "core-site.xml");
+		final File file4 = new File(hadoopHomeConf, "hdfs-site.xml");
+		final File file5 = new File(hadoopHomeEtc, "core-site.xml");
+		final File file6 = new File(hadoopHomeEtc, "hdfs-site.xml");
+
+		printConfig(file1, k1, v1);
+		printConfig(file2, k2, v2);
+		printConfig(file3, k3, v3);
+		printConfig(file4, k4, v4);
+		printConfig(file5, k5, v5);
+		printConfig(file6, k6, v6);
+
+		final org.apache.hadoop.conf.Configuration hadoopConf;
+
+		final Map<String, String> originalEnv = System.getenv();
+		final Map<String, String> newEnv = new HashMap<>(originalEnv);
+		newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath());
+		newEnv.put("HADOOP_HOME", hadoopHome.getAbsolutePath());
+		try {
+			CommonTestUtils.setEnv(newEnv);
+			hadoopConf = HadoopUtils.getHadoopConfiguration(new Configuration());
+		}
+		finally {
+			CommonTestUtils.setEnv(originalEnv);
+		}
+
+		// contains extra entries
+		assertEquals(v1, hadoopConf.get(k1, null));
+		assertEquals(v2, hadoopConf.get(k2, null));
+		assertEquals(v3, hadoopConf.get(k3, null));
+		assertEquals(v4, hadoopConf.get(k4, null));
+		assertEquals(v5, hadoopConf.get(k5, null));
+		assertEquals(v6, hadoopConf.get(k6, null));
+
+		// also contains classpath defaults
+		assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
+	}
+
+	private static void printConfig(File file, String key, String value) throws IOException {
+		try (PrintStream out = new PrintStream(new FileOutputStream(file))) {
+			out.println("<?xml version=\"1.0\"?>");
+			out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>");
+			out.println("<configuration>");
+			out.println("\t<property>");
+			out.println("\t\t<name>" + key + "</name>");
+			out.println("\t\t<value>" + value + "</value>");
+			out.println("\t</property>");
+			out.println("</configuration>");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
new file mode 100644
index 0000000..c6cf0eb
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the {@link HadoopDataInputStream}.
+ */
+public class HadoopDataInputStreamTest {
+
+	private FSDataInputStream verifyInputStream;
+	private HadoopDataInputStream testInputStream;
+
+	@Test
+	public void testSeekSkip() throws IOException {
+		verifyInputStream = spy(new FSDataInputStream(new SeekableByteArrayInputStream(new byte[2 * HadoopDataInputStream.MIN_SKIP_BYTES])));
+		testInputStream = new HadoopDataInputStream(verifyInputStream);
+		seekAndAssert(10);
+		seekAndAssert(10 + HadoopDataInputStream.MIN_SKIP_BYTES + 1);
+		seekAndAssert(testInputStream.getPos() - 1);
+		seekAndAssert(testInputStream.getPos() + 1);
+		seekAndAssert(testInputStream.getPos() - HadoopDataInputStream.MIN_SKIP_BYTES);
+		seekAndAssert(testInputStream.getPos());
+		seekAndAssert(0);
+		seekAndAssert(testInputStream.getPos() + HadoopDataInputStream.MIN_SKIP_BYTES);
+		seekAndAssert(testInputStream.getPos() + HadoopDataInputStream.MIN_SKIP_BYTES - 1);
+
+		try {
+			seekAndAssert(-1);
+			Assert.fail();
+		} catch (Exception ignore) {
+		}
+
+		try {
+			seekAndAssert(-HadoopDataInputStream.MIN_SKIP_BYTES - 1);
+			Assert.fail();
+		} catch (Exception ignore) {
+		}
+	}
+
+	private void seekAndAssert(long seekPos) throws IOException {
+		Assert.assertEquals(verifyInputStream.getPos(), testInputStream.getPos());
+		long delta = seekPos - testInputStream.getPos();
+		testInputStream.seek(seekPos);
+
+		if (delta > 0L && delta <= HadoopDataInputStream.MIN_SKIP_BYTES) {
+			verify(verifyInputStream, atLeastOnce()).skip(anyLong());
+			verify(verifyInputStream, never()).seek(anyLong());
+		} else if (delta != 0L) {
+			verify(verifyInputStream, atLeastOnce()).seek(seekPos);
+			verify(verifyInputStream, never()).skip(anyLong());
+		} else {
+			verify(verifyInputStream, never()).seek(anyLong());
+			verify(verifyInputStream, never()).skip(anyLong());
+		}
+
+		Assert.assertEquals(seekPos, verifyInputStream.getPos());
+		reset(verifyInputStream);
+	}
+
+	private static final class SeekableByteArrayInputStream
+		extends ByteArrayInputStreamWithPos
+		implements Seekable, PositionedReadable {
+
+		public SeekableByteArrayInputStream(byte[] buffer) {
+			super(buffer);
+		}
+
+		@Override
+		public void seek(long pos) throws IOException {
+			setPosition((int) pos);
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return getPosition();
+		}
+
+		@Override
+		public boolean seekToNewSource(long targetPos) throws IOException {
+			return false;
+		}
+
+		@Override
+		public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void readFully(long position, byte[] buffer) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
+}