You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/06 17:54:03 UTC

[1/3] flink git commit: [FLINK-7768] [core] Load File Systems via Java Service abstraction

Repository: flink
Updated Branches:
  refs/heads/master bad3df54d -> 77e3701ca


http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
deleted file mode 100644
index 21c18bc..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
-/**
- * Tests for the {@link HadoopDataInputStream}.
- */
-public class HadoopDataInputStreamTest {
-
-	private FSDataInputStream verifyInputStream;
-	private HadoopDataInputStream testInputStream;
-
-	@Test
-	public void testSeekSkip() throws IOException {
-		verifyInputStream = spy(new FSDataInputStream(new SeekableByteArrayInputStream(new byte[2 * HadoopDataInputStream.MIN_SKIP_BYTES])));
-		testInputStream = new HadoopDataInputStream(verifyInputStream);
-		seekAndAssert(10);
-		seekAndAssert(10 + HadoopDataInputStream.MIN_SKIP_BYTES + 1);
-		seekAndAssert(testInputStream.getPos() - 1);
-		seekAndAssert(testInputStream.getPos() + 1);
-		seekAndAssert(testInputStream.getPos() - HadoopDataInputStream.MIN_SKIP_BYTES);
-		seekAndAssert(testInputStream.getPos());
-		seekAndAssert(0);
-		seekAndAssert(testInputStream.getPos() + HadoopDataInputStream.MIN_SKIP_BYTES);
-		seekAndAssert(testInputStream.getPos() + HadoopDataInputStream.MIN_SKIP_BYTES - 1);
-
-		try {
-			seekAndAssert(-1);
-			Assert.fail();
-		} catch (Exception ignore) {
-		}
-
-		try {
-			seekAndAssert(-HadoopDataInputStream.MIN_SKIP_BYTES - 1);
-			Assert.fail();
-		} catch (Exception ignore) {
-		}
-	}
-
-	private void seekAndAssert(long seekPos) throws IOException {
-		Assert.assertEquals(verifyInputStream.getPos(), testInputStream.getPos());
-		long delta = seekPos - testInputStream.getPos();
-		testInputStream.seek(seekPos);
-
-		if (delta > 0L && delta <= HadoopDataInputStream.MIN_SKIP_BYTES) {
-			verify(verifyInputStream, atLeastOnce()).skip(anyLong());
-			verify(verifyInputStream, never()).seek(anyLong());
-		} else if (delta != 0L) {
-			verify(verifyInputStream, atLeastOnce()).seek(seekPos);
-			verify(verifyInputStream, never()).skip(anyLong());
-		} else {
-			verify(verifyInputStream, never()).seek(anyLong());
-			verify(verifyInputStream, never()).skip(anyLong());
-		}
-
-		Assert.assertEquals(seekPos, verifyInputStream.getPos());
-		reset(verifyInputStream);
-	}
-
-	private static final class SeekableByteArrayInputStream
-		extends ByteArrayInputStreamWithPos
-		implements Seekable, PositionedReadable {
-
-		public SeekableByteArrayInputStream(byte[] buffer) {
-			super(buffer);
-		}
-
-		@Override
-		public void seek(long pos) throws IOException {
-			setPosition((int) pos);
-		}
-
-		@Override
-		public long getPos() throws IOException {
-			return getPosition();
-		}
-
-		@Override
-		public boolean seekToNewSource(long targetPos) throws IOException {
-			return false;
-		}
-
-		@Override
-		public int read(long position, byte[] buffer, int offset, int length) throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void readFully(long position, byte[] buffer) throws IOException {
-			throw new UnsupportedOperationException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f264120..7f09abc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,6 +59,7 @@ under the License.
 		<module>flink-java</module>
 		<module>flink-java8</module>
 		<module>flink-scala</module>
+		<module>flink-filesystems</module>
 		<module>flink-runtime</module>
 		<module>flink-runtime-web</module>
 		<module>flink-optimizer</module>

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 77bad24..dd75d36 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -301,6 +301,23 @@ check_shaded_artifacts() {
 		return 1
 	fi
 
+
+	HADOOP=`cat allClasses | grep '^org/apache/hadoop' | wc -l`
+	if [ "$HADOOP" != "0" ]; then
+		echo "=============================================================================="
+		echo "Detected '$HADOOP' Hadoop classes in the dist jar
+		echo "=============================================================================="
+		return 1
+	fi
+
+	MAPR=`cat allClasses | grep '^com/mapr' | wc -l`
+	if [ "$MAPR" != "0" ]; then
+		echo "=============================================================================="
+		echo "Detected '$MAPR' MapR classes in the dist jar
+		echo "=============================================================================="
+		return 1
+	fi
+
 	return 0
 }
 


[2/3] flink git commit: [FLINK-7768] [core] Load File Systems via Java Service abstraction

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeFsFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeFsFactoryTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeFsFactoryTest.java
new file mode 100644
index 0000000..04bc917
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeFsFactoryTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URLClassLoader;
+
+/**
+ * Tests that validate the behavior of the Hadoop File System Factory.
+ */
+public class HadoopFreeFsFactoryTest extends TestLogger {
+
+	/**
+	 * This test validates that the factory can be instantiated and configured even
+	 * when Hadoop classes are missing from the classpath.
+	 */
+	@Test
+	public void testHadoopFactoryInstantiationWithoutHadoop() throws Exception {
+		// we do reflection magic here to instantiate the test in another class
+		// loader, to make sure no hadoop classes are in the classpath
+
+		final String testClassName = "org.apache.flink.runtime.fs.hdfs.HadoopFreeTests";
+
+		URLClassLoader parent = (URLClassLoader) getClass().getClassLoader();
+		ClassLoader hadoopFreeClassLoader = new HadoopFreeClassLoader(parent);
+		Class<?> testClass = Class.forName(testClassName, false, hadoopFreeClassLoader);
+		Method m = testClass.getDeclaredMethod("test");
+
+		try {
+			m.invoke(null);
+		}
+		catch (InvocationTargetException e) {
+			ExceptionUtils.rethrowException(e.getTargetException(), "exception in method");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class HadoopFreeClassLoader extends URLClassLoader {
+
+		private final ClassLoader properParent;
+
+		HadoopFreeClassLoader(URLClassLoader parent) {
+			super(parent.getURLs(), null);
+			properParent = parent;
+		}
+
+		@Override
+		public Class<?> loadClass(String name) throws ClassNotFoundException {
+			if (name.startsWith("org.apache.hadoop")) {
+				throw new ClassNotFoundException(name);
+			}
+			else if (name.startsWith("org.apache.log4j")) {
+				return properParent.loadClass(name);
+			}
+			else {
+				return super.loadClass(name);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeTests.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeTests.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeTests.java
new file mode 100644
index 0000000..e78fe8d
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFreeTests.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
+
+import java.net.URI;
+
+import static org.junit.Assert.fail;
+
+/**
+ * A class with tests that require to be run in a Hadoop-free environment, to test
+ * proper error handling when no Hadoop classes are available.
+ *
+ * <p>This class must be dynamically loaded in a Hadoop-free class loader.
+ */
+// this class is only instantiated via reflection
+@SuppressWarnings("unused")
+public class HadoopFreeTests {
+
+	public static void test() throws Exception {
+		// make sure no Hadoop FS classes are in the classpath
+		try {
+			Class.forName("org.apache.hadoop.fs.FileSystem");
+			fail("Cannot run test when Hadoop classes are in the classpath");
+		}
+		catch (ClassNotFoundException ignored) {}
+
+		try {
+			Class.forName("org.apache.hadoop.conf.Configuration");
+			fail("Cannot run test when Hadoop classes are in the classpath");
+		}
+		catch (ClassNotFoundException ignored) {}
+
+		// this method should complete without a linkage error
+		final HadoopFsFactory factory = new HadoopFsFactory();
+
+		// this method should also complete without a linkage error
+		factory.configure(new Configuration());
+
+		try {
+			factory.create(new URI("hdfs://somehost:9000/root/dir"));
+			fail("This statement should fail with an exception");
+		}
+		catch (UnsupportedFileSystemSchemeException e) {
+			// expected
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
new file mode 100644
index 0000000..1f5c932
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that validate the behavior of the Hadoop File System Factory.
+ */
+public class HadoopFsFactoryTest extends TestLogger {
+
+	@Test
+	public void testCreateHadoopFsWithoutConfig() throws Exception {
+		final URI uri = URI.create("hdfs://localhost:12345/");
+
+		HadoopFsFactory factory = new HadoopFsFactory();
+		HadoopFileSystem fs = factory.create(uri);
+
+		assertEquals(uri.getScheme(), fs.getUri().getScheme());
+		assertEquals(uri.getAuthority(), fs.getUri().getAuthority());
+		assertEquals(uri.getPort(), fs.getUri().getPort());
+	}
+
+	@Test
+	public void testCreateHadoopFsWithMissingAuthority() throws Exception {
+		final URI uri = URI.create("hdfs:///my/path");
+
+		HadoopFsFactory factory = new HadoopFsFactory();
+
+		try {
+			factory.create(uri);
+			fail("should have failed with an exception");
+		}
+		catch (IOException e) {
+			assertTrue(e.getMessage().contains("authority"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/resources/core-site.xml b/flink-filesystems/flink-hadoop-fs/src/test/resources/core-site.xml
new file mode 100644
index 0000000..56c7d55
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/resources/core-site.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- Values used when running unit tests.  Specify any values in here that
+     should override the default values. -->
+
+<configuration>
+    <property>
+        <name>cp_conf_key</name>
+        <value>oompf!</value>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/resources/log4j-test.properties b/flink-filesystems/flink-hadoop-fs/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2be3589
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/pom.xml b/flink-filesystems/flink-mapr-fs/pom.xml
new file mode 100644
index 0000000..12bb9bc
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-filesystems</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-mapr-fs</artifactId>
+	<name>flink-mapr-fs</name>
+
+	<packaging>jar</packaging>
+
+	<repositories>
+		<repository>
+			<id>mapr-releases</id>
+			<url>http://repository.mapr.com/maven/</url>
+			<snapshots><enabled>false</enabled></snapshots>
+			<releases><enabled>true</enabled></releases>
+		</repository>
+	</repositories>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-fs</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<!-- MapR dependencies as optional dependency, so we can hard depend on this without -->
+		<!-- pulling in MapR libraries by default -->
+
+		<dependency>
+			<groupId>com.mapr.hadoop</groupId>
+			<artifactId>maprfs</artifactId>
+			<version>5.2.1-mapr</version>
+			<optional>true</optional>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
new file mode 100644
index 0000000..058772c
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A MapR file system client for Flink.
+ *
+ * <p>Internally, this class wraps the {@link org.apache.hadoop.fs.FileSystem} implementation
+ * of the MapR file system client.
+ */
+public class MapRFileSystem extends HadoopFileSystem {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MapRFileSystem.class);
+
+	/** Name of the environment variable to determine the location of the MapR
+	 * installation. */
+	private static final String MAPR_HOME_ENV = "MAPR_HOME";
+
+	/** The default location of the MapR installation. */
+	private static final String DEFAULT_MAPR_HOME = "/opt/mapr/";
+
+	/** The path relative to the MAPR_HOME where MapR stores how to access the
+	 * configured clusters. */
+	private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf";
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a MapRFileSystem for the given URI.
+	 *
+	 * @param fsUri The URI describing the file system
+	 * @throws IOException Thrown if the file system could not be initialized.
+	 */
+	public MapRFileSystem(URI fsUri) throws IOException {
+		super(instantiateMapRFileSystem(fsUri));
+	}
+
+	private static org.apache.hadoop.fs.FileSystem instantiateMapRFileSystem(URI fsUri) throws IOException {
+		checkNotNull(fsUri, "fsUri");
+
+		final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+		final com.mapr.fs.MapRFileSystem fs;
+
+		final String authority = fsUri.getAuthority();
+		if (authority == null || authority.isEmpty()) {
+
+			// Use the default constructor to instantiate MapR file system object
+			fs = new com.mapr.fs.MapRFileSystem();
+		}
+		else {
+			// We have an authority, check the MapR cluster configuration to
+			// find the CLDB locations.
+			final String[] cldbLocations = getCLDBLocations(authority);
+			fs = new com.mapr.fs.MapRFileSystem(authority, cldbLocations);
+		}
+
+		// now initialize the Hadoop File System object
+		fs.initialize(fsUri, conf);
+
+		return fs;
+	}
+
+	/**
+	 * Retrieves the CLDB locations for the given MapR cluster name.
+	 *
+	 * @param authority
+	 *            the name of the MapR cluster
+	 * @return a list of CLDB locations
+	 * @throws IOException
+	 *             thrown if the CLDB locations for the given MapR cluster name
+	 *             cannot be determined
+	 */
+	private static String[] getCLDBLocations(String authority) throws IOException {
+
+		// Determine the MapR home
+		String maprHome = System.getenv(MAPR_HOME_ENV);
+		if (maprHome == null) {
+			maprHome = DEFAULT_MAPR_HOME;
+		}
+
+		final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(String.format(
+					"Trying to retrieve MapR cluster configuration from %s",
+					maprClusterConf));
+		}
+
+		if (!maprClusterConf.exists()) {
+			throw new IOException("Could not find CLDB configuration '" + maprClusterConf.getAbsolutePath() +
+					"', assuming MapR home is '" + maprHome + "'.");
+		}
+
+		// Read the cluster configuration file, format is specified at
+		// http://doc.mapr.com/display/MapR/mapr-clusters.conf
+
+		try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) {
+
+			String line;
+			while ((line = br.readLine()) != null) {
+
+				// Normalize the string
+				line = line.trim();
+				line = line.replace('\t', ' ');
+
+				final String[] fields = line.split(" ");
+				if (fields.length < 1) {
+					continue;
+				}
+
+				final String clusterName = fields[0];
+
+				if (!clusterName.equals(authority)) {
+					continue;
+				}
+
+				final List<String> cldbLocations = new ArrayList<>();
+
+				for (int i = 1; i < fields.length; ++i) {
+
+					// Make sure this is not a key-value pair MapR recently
+					// introduced in the file format along with their security
+					// features.
+					if (!fields[i].isEmpty() && !fields[i].contains("=")) {
+						cldbLocations.add(fields[i]);
+					}
+				}
+
+				if (cldbLocations.isEmpty()) {
+					throw new IOException(
+							String.format(
+									"%s contains entry for cluster %s but no CLDB locations.",
+									maprClusterConf, authority));
+				}
+
+				return cldbLocations.toArray(new String[cldbLocations.size()]);
+			}
+
+		}
+
+		throw new IOException(String.format(
+				"Unable to find CLDB locations for cluster %s", authority));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
new file mode 100644
index 0000000..e163f63
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory for the MapR file system.
+ *
+ * <p>This factory tries to reflectively instantiate the MapR file system. It can only be
+ * used when the MapR FS libraries are in the classpath.
+ */
+public class MapRFsFactory implements FileSystemFactory {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MapRFsFactory.class);
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String getScheme() {
+		return "maprfs";
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		// nothing to configure based on the configuration here
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		checkNotNull(fsUri, "fsUri");
+
+		try {
+			LOG.info("Trying to load and instantiate MapR File System");
+
+			return new MapRFileSystem(fsUri);
+		}
+		catch (LinkageError e) {
+			throw new IOException("Could not load MapR file system. "  +
+					"Please make sure the Flink runtime classes are part of the classpath or dependencies.", e);
+		}
+		catch (IOException e) {
+			throw e;
+		}
+		catch (Throwable t) {
+			throw new IOException("Could not instantiate MapR file system.", t);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-filesystems/flink-mapr-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..ffc7bcf
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.fs.maprfs.MapRFsFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java
new file mode 100644
index 0000000..aee50ba
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test checks that the file system is properly accessible through the
+ * service loading abstraction.
+ */
+public class FileSystemAccessTest extends TestLogger {
+
+	@Test
+	public void testGetMapRFs() throws Exception {
+		final Path path = new Path("maprfs:///my/path");
+
+		FileSystem fs = path.getFileSystem();
+		assertEquals(path.toUri().getScheme(), fs.getUri().getScheme());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java
new file mode 100644
index 0000000..110fce3
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * A class with tests that require to be run in a MapR/Hadoop-free environment,
+ * to test proper error handling when no Hadoop classes are available.
+ *
+ * <p>This class must be dynamically loaded in a MapR/Hadoop-free class loader.
+ */
+// this class is only instantiated via reflection
+@SuppressWarnings("unused")
+public class MapRFreeTests {
+
+	public static void test() throws Exception {
+		// make sure no MapR or Hadoop FS classes are in the classpath
+		try {
+			Class.forName("com.mapr.fs.MapRFileSystem");
+			fail("Cannot run test when MapR classes are in the classpath");
+		}
+		catch (ClassNotFoundException ignored) {}
+
+		try {
+			Class.forName("org.apache.hadoop.fs.FileSystem");
+			fail("Cannot run test when Hadoop classes are in the classpath");
+		}
+		catch (ClassNotFoundException ignored) {}
+
+		try {
+			Class.forName("org.apache.hadoop.conf.Configuration");
+			fail("Cannot run test when Hadoop classes are in the classpath");
+		}
+		catch (ClassNotFoundException ignored) {}
+
+		// this method should complete without a linkage error
+		final MapRFsFactory factory = new MapRFsFactory();
+
+		// this method should also complete without a linkage error
+		factory.configure(new Configuration());
+
+		try {
+			factory.create(new URI("maprfs://somehost:9000/root/dir"));
+			fail("This statement should fail with an exception");
+		}
+		catch (IOException e) {
+			assertTrue(e.getMessage().contains("MapR"));
+			assertTrue(e.getMessage().contains("classpath"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java
new file mode 100644
index 0000000..984668f
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the MapRFsFactory.
+ */
+public class MapRFsFactoryTest extends TestLogger {
+
+	/**
+	 * This test validates that the factory can be instantiated and configured even
+	 * when MapR and Hadoop classes are missing from the classpath.
+	 */
+	@Test
+	public void testInstantiationWithoutMapRClasses() throws Exception {
+		// we do reflection magic here to instantiate the test in another class
+		// loader, to make sure no MapR and Hadoop classes are in the classpath
+
+		final String testClassName = "org.apache.flink.runtime.fs.maprfs.MapRFreeTests";
+
+		URLClassLoader parent = (URLClassLoader) getClass().getClassLoader();
+		ClassLoader maprFreeClassLoader = new MapRFreeClassLoader(parent);
+		Class<?> testClass = Class.forName(testClassName, false, maprFreeClassLoader);
+		Method m = testClass.getDeclaredMethod("test");
+
+		try {
+			m.invoke(null);
+		}
+		catch (InvocationTargetException e) {
+			ExceptionUtils.rethrowException(e.getTargetException(), "exception in method");
+		}
+	}
+
+	@Test
+	public void testCreateFsWithAuthority() throws Exception {
+		final URI uri = URI.create("maprfs://localhost:12345/");
+
+		MapRFsFactory factory = new MapRFsFactory();
+
+		try {
+			factory.create(uri);
+			fail("should have failed with an exception");
+		}
+		catch (IOException e) {
+			// expected, because we have no CLDB config available
+		}
+	}
+
+	@Test
+	public void testCreateFsWithMissingAuthority() throws Exception {
+		final URI uri = URI.create("maprfs:///my/path");
+
+		MapRFsFactory factory = new MapRFsFactory();
+		factory.configure(new Configuration());
+
+		FileSystem fs = factory.create(uri);
+		assertEquals("maprfs", fs.getUri().getScheme());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class MapRFreeClassLoader extends URLClassLoader {
+
+		private final ClassLoader properParent;
+
+		MapRFreeClassLoader(URLClassLoader parent) {
+			super(parent.getURLs(), null);
+			properParent = parent;
+		}
+
+		@Override
+		public Class<?> loadClass(String name) throws ClassNotFoundException {
+			if (name.startsWith("com.mapr") || name.startsWith("org.apache.hadoop")) {
+				throw new ClassNotFoundException(name);
+			}
+			else if (name.startsWith("org.apache.log4j")) {
+				return properParent.loadClass(name);
+			}
+			else {
+				return super.loadClass(name);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-mapr-fs/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/test/resources/log4j-test.properties b/flink-filesystems/flink-mapr-fs/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2be3589
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/pom.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/pom.xml b/flink-filesystems/pom.xml
new file mode 100644
index 0000000..b0199da
--- /dev/null
+++ b/flink-filesystems/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+
+	<artifactId>flink-filesystems</artifactId>
+	<name>flink-filesystems</name>
+	<packaging>pom</packaging>
+
+	<modules>
+		<module>flink-hadoop-fs</module>
+		<module>flink-mapr-fs</module>
+	</modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 3ed7e70..71ebfb7 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -50,6 +50,17 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<!-- The Hadoop FS support has only an optional dependency on Hadoop and
+			gracefully handles absence of Hadoop classes -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-fs</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<!-- optional dependency on Hadoop, so that Hadoop classes are not always pulled in -->
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-shaded-hadoop2</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
deleted file mode 100644
index 1484c95..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.BlockLocation;
-
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Implementation of the {@link BlockLocation} interface for the
- * Hadoop Distributed File System.
- */
-public final class HadoopBlockLocation implements BlockLocation {
-
-	/**
-	 * Specifies the character separating the hostname from the domain name.
-	 */
-	private static final char DOMAIN_SEPARATOR = '.';
-
-	/**
-	 * Regular expression for an IPv4 address.
-	 */
-	private static final Pattern IPV4_PATTERN = Pattern.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+$");
-
-	/**
-	 * The original Hadoop block location object.
-	 */
-	private final org.apache.hadoop.fs.BlockLocation blockLocation;
-
-	/**
-	 * Stores the hostnames without the domain suffix.
-	 */
-	private String[] hostnames;
-
-	/**
-	 * Creates a new block location.
-	 *
-	 * @param blockLocation
-	 *        the original HDFS block location
-	 */
-	public HadoopBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
-
-		this.blockLocation = blockLocation;
-	}
-
-	@Override
-	public String[] getHosts() throws IOException {
-
-		/**
-		 * Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
-		 * the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
-		 * depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
-		 * sure it does not contain the domain suffix.
-		 */
-		if (this.hostnames == null) {
-
-			final String[] hadoopHostnames = blockLocation.getHosts();
-			this.hostnames = new String[hadoopHostnames.length];
-
-			for (int i = 0; i < hadoopHostnames.length; ++i) {
-				this.hostnames[i] = stripHostname(hadoopHostnames[i]);
-			}
-		}
-
-		return this.hostnames;
-	}
-
-	/**
-	 * Looks for a domain suffix in a FQDN and strips it if present.
-	 *
-	 * @param originalHostname
-	 *        the original hostname, possibly an FQDN
-	 * @return the stripped hostname without the domain suffix
-	 */
-	private static String stripHostname(final String originalHostname) {
-
-		// Check if the hostname domains the domain separator character
-		final int index = originalHostname.indexOf(DOMAIN_SEPARATOR);
-		if (index == -1) {
-			return originalHostname;
-		}
-
-		// Make sure we are not stripping an IPv4 address
-		final Matcher matcher = IPV4_PATTERN.matcher(originalHostname);
-		if (matcher.matches()) {
-			return originalHostname;
-		}
-
-		if (index == 0) {
-			throw new IllegalStateException("Hostname " + originalHostname + " starts with a " + DOMAIN_SEPARATOR);
-		}
-
-		return originalHostname.substring(0, index);
-	}
-
-	@Override
-	public long getLength() {
-
-		return this.blockLocation.getLength();
-	}
-
-	@Override
-	public long getOffset() {
-
-		return this.blockLocation.getOffset();
-	}
-
-	@Override
-	public int compareTo(final BlockLocation o) {
-
-		final long diff = getOffset() - o.getOffset();
-
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
deleted file mode 100644
index da50c4c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Concrete implementation of the {@link FSDataInputStream} for Hadoop's input streams.
- * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
- */
-public final class HadoopDataInputStream extends FSDataInputStream {
-
-	/**
-	 * Minimum amount of bytes to skip forward before we issue a seek instead of discarding read.
-	 *
-	 * <p>The current value is just a magic number. In the long run, this value could become configurable, but for now it
-	 * is a conservative, relatively small value that should bring safe improvements for small skips (e.g. in reading
-	 * meta data), that would hurt the most with frequent seeks.
-	 *
-	 * <p>The optimal value depends on the DFS implementation and configuration plus the underlying filesystem.
-	 * For now, this number is chosen "big enough" to provide improvements for smaller seeks, and "small enough" to
-	 * avoid disadvantages over real seeks. While the minimum should be the page size, a true optimum per system would
-	 * be the amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, seektime is not
-	 * constant and devices, OS, and DFS potentially also use read buffers and read-ahead.
-	 */
-	public static final int MIN_SKIP_BYTES = 1024 * 1024;
-
-	/** The internal stream. */
-	private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
-
-	/**
-	 * Creates a new data input stream from the given Hadoop input stream.
-	 *
-	 * @param fsDataInputStream The Hadoop input stream
-	 */
-	public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
-		this.fsDataInputStream = checkNotNull(fsDataInputStream);
-	}
-
-	@Override
-	public void seek(long seekPos) throws IOException {
-		// We do some optimizations to avoid that some implementations of distributed FS perform
-		// expensive seeks when they are actually not needed.
-		long delta = seekPos - getPos();
-
-		if (delta > 0L && delta <= MIN_SKIP_BYTES) {
-			// Instead of a small forward seek, we skip over the gap
-			skipFully(delta);
-		} else if (delta != 0L) {
-			// For larger gaps and backward seeks, we do a real seek
-			forceSeek(seekPos);
-		} // Do nothing if delta is zero.
-	}
-
-	@Override
-	public long getPos() throws IOException {
-		return fsDataInputStream.getPos();
-	}
-
-	@Override
-	public int read() throws IOException {
-		return fsDataInputStream.read();
-	}
-
-	@Override
-	public void close() throws IOException {
-		fsDataInputStream.close();
-	}
-
-	@Override
-	public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
-		return fsDataInputStream.read(buffer, offset, length);
-	}
-
-	@Override
-	public int available() throws IOException {
-		return fsDataInputStream.available();
-	}
-
-	@Override
-	public long skip(long n) throws IOException {
-		return fsDataInputStream.skip(n);
-	}
-
-	/**
-	 * Gets the wrapped Hadoop input stream.
-	 * @return The wrapped Hadoop input stream.
-	 */
-	public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream() {
-		return fsDataInputStream;
-	}
-
-	/**
-	 * Positions the stream to the given location. In contrast to {@link #seek(long)}, this method will
-	 * always issue a "seek" command to the dfs and may not replace it by {@link #skip(long)} for small seeks.
-	 *
-	 * <p>Notice that the underlying DFS implementation can still decide to do skip instead of seek.
-	 *
-	 * @param seekPos the position to seek to.
-	 * @throws IOException
-	 */
-	public void forceSeek(long seekPos) throws IOException {
-		fsDataInputStream.seek(seekPos);
-	}
-
-	/**
-	 * Skips over a given amount of bytes in the stream.
-	 *
-	 * @param bytes the number of bytes to skip.
-	 * @throws IOException
-	 */
-	public void skipFully(long bytes) throws IOException {
-		while (bytes > 0) {
-			bytes -= fsDataInputStream.skip(bytes);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
deleted file mode 100644
index 1b8d1a3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.FSDataOutputStream;
-
-import java.io.IOException;
-
-/**
- * Concrete implementation of the {@link FSDataOutputStream} for Hadoop's input streams.
- * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
- */
-public class HadoopDataOutputStream extends FSDataOutputStream {
-
-	private final org.apache.hadoop.fs.FSDataOutputStream fdos;
-
-	public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos) {
-		if (fdos == null) {
-			throw new NullPointerException();
-		}
-		this.fdos = fdos;
-	}
-
-	@Override
-	public void write(int b) throws IOException {
-		fdos.write(b);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		fdos.write(b, off, len);
-	}
-
-	@Override
-	public void close() throws IOException {
-		fdos.close();
-	}
-
-	@Override
-	public long getPos() throws IOException {
-		return fdos.getPos();
-	}
-
-	@Override
-	public void flush() throws IOException {
-		fdos.hflush();
-	}
-
-	@Override
-	public void sync() throws IOException {
-		fdos.hsync();
-	}
-
-	/**
-	 * Gets the wrapped Hadoop output stream.
-	 * @return The wrapped Hadoop output stream.
-	 */
-	public org.apache.hadoop.fs.FSDataOutputStream getHadoopOutputStream() {
-		return fdos;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
deleted file mode 100644
index 17bb334..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.Path;
-
-/**
- * Concrete implementation of the {@link FileStatus} interface for the
- * Hadoop Distribution File System.
- */
-public final class HadoopFileStatus implements FileStatus {
-
-	private org.apache.hadoop.fs.FileStatus fileStatus;
-
-	/**
-	 * Creates a new file status from a HDFS file status.
-	 *
-	 * @param fileStatus
-	 *        the HDFS file status
-	 */
-	public HadoopFileStatus(org.apache.hadoop.fs.FileStatus fileStatus) {
-		this.fileStatus = fileStatus;
-	}
-
-	@Override
-	public long getLen() {
-		return fileStatus.getLen();
-	}
-
-	@Override
-	public long getBlockSize() {
-		long blocksize = fileStatus.getBlockSize();
-		if (blocksize > fileStatus.getLen()) {
-			return fileStatus.getLen();
-		}
-
-		return blocksize;
-	}
-
-	@Override
-	public long getAccessTime() {
-		return fileStatus.getAccessTime();
-	}
-
-	@Override
-	public long getModificationTime() {
-		return fileStatus.getModificationTime();
-	}
-
-	@Override
-	public short getReplication() {
-		return fileStatus.getReplication();
-	}
-
-	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
-		return this.fileStatus;
-	}
-
-	@Override
-	public Path getPath() {
-		return new Path(fileStatus.getPath().toString());
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public boolean isDir() {
-		return fileStatus.isDir();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
deleted file mode 100644
index 4ebf4bc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-
-import java.io.IOException;
-import java.net.URI;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link FileSystem} that wraps an {@link org.apache.hadoop.fs.FileSystem Hadoop File System}.
- */
-public final class HadoopFileSystem extends FileSystem {
-
-	/** The wrapped Hadoop File System. */
-	private final org.apache.hadoop.fs.FileSystem fs;
-
-	/**
-	 * Wraps the given Hadoop File System object as a Flink File System object.
-	 * The given Hadoop file system object is expected to be initialized already.
-	 *
-	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
-	 */
-	public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
-		this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
-	}
-
-	/**
-	 * Gets the underlying Hadoop FileSystem.
-	 * @return The underlying Hadoop FileSystem.
-	 */
-	public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
-		return this.fs;
-	}
-
-	// ------------------------------------------------------------------------
-	//  file system methods
-	// ------------------------------------------------------------------------
-
-	@Override
-	public Path getWorkingDirectory() {
-		return new Path(this.fs.getWorkingDirectory().toUri());
-	}
-
-	public Path getHomeDirectory() {
-		return new Path(this.fs.getHomeDirectory().toUri());
-	}
-
-	@Override
-	public URI getUri() {
-		return fs.getUri();
-	}
-
-	@Override
-	public FileStatus getFileStatus(final Path f) throws IOException {
-		org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
-		return new HadoopFileStatus(status);
-	}
-
-	@Override
-	public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
-			throws IOException {
-		if (!(file instanceof HadoopFileStatus)) {
-			throw new IOException("file is not an instance of DistributedFileStatus");
-		}
-
-		final HadoopFileStatus f = (HadoopFileStatus) file;
-
-		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
-			start, len);
-
-		// Wrap up HDFS specific block location objects
-		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
-		for (int i = 0; i < distBlkLocations.length; i++) {
-			distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
-		}
-
-		return distBlkLocations;
-	}
-
-	@Override
-	public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
-		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
-		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
-		return new HadoopDataInputStream(fdis);
-	}
-
-	@Override
-	public HadoopDataInputStream open(final Path f) throws IOException {
-		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
-		final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
-		return new HadoopDataInputStream(fdis);
-	}
-
-	@Override
-	@SuppressWarnings("deprecation")
-	public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
-			final short replication, final long blockSize) throws IOException {
-		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
-			new org.apache.hadoop.fs.Path(f.toString()), overwrite, bufferSize, replication, blockSize);
-		return new HadoopDataOutputStream(fdos);
-	}
-
-	@Override
-	public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
-		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
-			.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE);
-		return new HadoopDataOutputStream(fsDataOutputStream);
-	}
-
-	@Override
-	public boolean delete(final Path f, final boolean recursive) throws IOException {
-		return this.fs.delete(new org.apache.hadoop.fs.Path(f.toString()), recursive);
-	}
-
-	@Override
-	public FileStatus[] listStatus(final Path f) throws IOException {
-		final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(new org.apache.hadoop.fs.Path(f.toString()));
-		final FileStatus[] files = new FileStatus[hadoopFiles.length];
-
-		// Convert types
-		for (int i = 0; i < files.length; i++) {
-			files[i] = new HadoopFileStatus(hadoopFiles[i]);
-		}
-
-		return files;
-	}
-
-	@Override
-	public boolean mkdirs(final Path f) throws IOException {
-		return this.fs.mkdirs(new org.apache.hadoop.fs.Path(f.toString()));
-	}
-
-	@Override
-	public boolean rename(final Path src, final Path dst) throws IOException {
-		return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()),
-			new org.apache.hadoop.fs.Path(dst.toString()));
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public long getDefaultBlockSize() {
-		return this.fs.getDefaultBlockSize();
-	}
-
-	@Override
-	public boolean isDistributedFS() {
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
deleted file mode 100644
index d3b1b89..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
-import org.apache.flink.runtime.util.HadoopUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.UnknownHostException;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A file system factory for Hadoop-based file systems.
- *
- * <p>This factory calls Hadoop's mechanism to find a file system implementation for a given file
- * system scheme (a {@link org.apache.hadoop.fs.FileSystem}) and wraps it as a Flink file system
- * (a {@link org.apache.flink.core.fs.FileSystem}).
- */
-public class HadoopFsFactory implements FileSystemFactory {
-
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopFsFactory.class);
-
-	/** Hadoop's configuration for the file systems. */
-	private org.apache.hadoop.conf.Configuration hadoopConfig;
-
-	@Override
-	public void configure(Configuration config) {
-		hadoopConfig = HadoopUtils.getHadoopConfiguration(config);
-	}
-
-	@Override
-	public FileSystem create(URI fsUri) throws IOException {
-		checkNotNull(fsUri, "fsUri");
-
-		final String scheme = fsUri.getScheme();
-		checkArgument(scheme != null, "file system has null scheme");
-
-		// -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
-
-		final org.apache.hadoop.conf.Configuration hadoopConfig;
-		if (this.hadoopConfig != null) {
-			hadoopConfig = this.hadoopConfig;
-		}
-		else {
-			LOG.warn("Hadoop configuration has not been explicitly initialized prior to loading a Hadoop file system."
-					+ " Using configuration from the classpath.");
-
-			hadoopConfig = new org.apache.hadoop.conf.Configuration();
-		}
-
-		// -- (2) create the proper URI to initialize the file system
-
-		final URI initUri;
-		if (fsUri.getAuthority() != null) {
-			initUri = fsUri;
-		}
-		else {
-			LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)");
-
-			String configEntry = hadoopConfig.get("fs.defaultFS", null);
-			if (configEntry == null) {
-				// fs.default.name deprecated as of hadoop 2.2.0 - see
-				// http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
-				configEntry = hadoopConfig.get("fs.default.name", null);
-			}
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
-			}
-
-			if (configEntry == null) {
-				throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
-						"Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').");
-			}
-			else {
-				try {
-					initUri = URI.create(configEntry);
-				}
-				catch (IllegalArgumentException e) {
-					throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
-							"The configuration contains an invalid file system default name " +
-							"('fs.default.name' or 'fs.defaultFS'): " + configEntry);
-				}
-
-				if (initUri.getAuthority() == null) {
-					throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
-							"Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " +
-							"contains no valid authority component (like hdfs namenode, S3 host, etc)");
-				}
-			}
-		}
-
-		// -- (3) get the Hadoop file system class for that scheme
-
-		final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
-		try {
-			fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig);
-		}
-		catch (IOException e) {
-			throw new UnsupportedFileSystemSchemeException(
-					"Hadoop File System abstraction does not support scheme '" + scheme + "'. " +
-							"Either no file system implementation exists for that scheme, " +
-							"or the relevant classes are missing from the classpath.", e);
-		}
-
-		LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName());
-
-		// -- (4) instantiate the Hadoop file system
-
-		final org.apache.hadoop.fs.FileSystem hadoopFs;
-		try {
-			hadoopFs = fsClass.newInstance();
-		}
-		catch (Exception e) {
-			throw new IOException("The Hadoop file system class '" + fsClass.getName() + "' cannot be instantiated.", e);
-		}
-
-		// -- (5) configure the Hadoop file system
-
-		try {
-			hadoopFs.initialize(initUri, hadoopConfig);
-		}
-		catch (UnknownHostException e) {
-			String message = "The Hadoop file system's authority (" + initUri.getAuthority() +
-					"), specified by either the file URI or the configuration, cannot be resolved.";
-
-			throw new IOException(message, e);
-		}
-		catch (Exception e) {
-			throw new IOException("Hadoop file system " + fsClass.getName() + " for scheme '" + scheme +
-					"' could not ne initialized.", e);
-		}
-
-		return new HadoopFileSystem(hadoopFs);
-	}
-
-	private static String getMissingAuthorityErrorPrefix(URI fsURI) {
-		return "The given file system URI (" + fsURI.toString() + ") did not describe the authority " +
-				"(like for example HDFS NameNode address/port or S3 host). " +
-				"The attempt to use a configured default authority failed: ";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
deleted file mode 100644
index 9e12f96..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.maprfs;
-
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
-import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
-import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Concrete implementation of the {@link FileSystem} base class for the MapR
- * file system. The class contains MapR specific code to initialize the
- * connection to the file system. Apart from that, we code mainly reuses the
- * existing HDFS wrapper code.
- */
-@SuppressWarnings("unused") // is only instantiated via reflection
-public final class MapRFileSystem extends FileSystem {
-
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(MapRFileSystem.class);
-
-	/**
-	 * The name of MapR's class containing the implementation of the Hadoop HDFS
-	 * interface.
-	 */
-	private static final String MAPR_FS_IMPL_CLASS = "com.mapr.fs.MapRFileSystem";
-
-	/**
-	 * Name of the environment variable to determine the location of the MapR
-	 * installation.
-	 */
-	private static final String MAPR_HOME_ENV = "MAPR_HOME";
-
-	/**
-	 * The default location of the MapR installation.
-	 */
-	private static final String DEFAULT_MAPR_HOME = "/opt/mapr/";
-
-	/**
-	 * The path relative to the MAPR_HOME where MapR stores how to access the
-	 * configured clusters.
-	 */
-	private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf";
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The MapR implementation of the Hadoop HDFS interface.
-	 */
-	private final org.apache.hadoop.fs.FileSystem fs;
-
-	/**
-	 * Creates a new MapRFileSystem object to access the MapR file system.
-	 *
-	 * @throws IOException
-	 *             throw if the required MapR classes cannot be found
-	 */
-	public MapRFileSystem(URI fsURI) throws IOException {
-		checkNotNull(fsURI, "fsURI");
-
-		LOG.debug("Trying to load class {} to access the MapR file system", MAPR_FS_IMPL_CLASS);
-
-		final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
-		try {
-			fsClass = Class.forName(MAPR_FS_IMPL_CLASS).asSubclass(org.apache.hadoop.fs.FileSystem.class);
-		}
-		catch (Exception e) {
-			throw new IOException(
-					String.format("Cannot load MapR File System class '%s'. " +
-							"Please check that the MapR Hadoop libraries are in the classpath.",
-							MAPR_FS_IMPL_CLASS), e);
-		}
-
-		LOG.info("Initializing MapR file system for URI {}", fsURI);
-
-		final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-		final org.apache.hadoop.fs.FileSystem fs;
-
-		final String authority = fsURI.getAuthority();
-		if (authority == null || authority.isEmpty()) {
-
-			// Use the default constructor to instantiate MapR file system object
-
-			try {
-				fs = fsClass.newInstance();
-			}
-			catch (Exception e) {
-				throw new IOException(e);
-			}
-		} else {
-
-			// We have an authority, check the MapR cluster configuration to
-			// find the CLDB locations.
-			final String[] cldbLocations = getCLDBLocations(authority);
-
-			// Find the appropriate constructor
-			try {
-				final Constructor<? extends org.apache.hadoop.fs.FileSystem> constructor =
-						fsClass.getConstructor(String.class, String[].class);
-
-				fs = constructor.newInstance(authority, cldbLocations);
-			}
-			catch (InvocationTargetException e) {
-				if (e.getTargetException() instanceof IOException) {
-					throw (IOException) e.getTargetException();
-				} else {
-					throw new IOException(e.getTargetException());
-				}
-			}
-			catch (Exception e) {
-				throw new IOException(e);
-			}
-		}
-
-		// now initialize the Hadoop File System object
-		fs.initialize(fsURI, conf);
-
-		// all good as it seems
-		this.fs = fs;
-	}
-
-	// ------------------------------------------------------------------------
-	//  file system methods
-	// ------------------------------------------------------------------------
-
-	@Override
-	public Path getWorkingDirectory() {
-		return new Path(this.fs.getWorkingDirectory().toUri());
-	}
-
-	public Path getHomeDirectory() {
-		return new Path(this.fs.getHomeDirectory().toUri());
-	}
-
-	@Override
-	public URI getUri() {
-		return this.fs.getUri();
-	}
-
-	@Override
-	public FileStatus getFileStatus(final Path f) throws IOException {
-
-		final org.apache.hadoop.fs.FileStatus status = this.fs
-				.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
-
-		return new HadoopFileStatus(status);
-	}
-
-	@Override
-	public BlockLocation[] getFileBlockLocations(final FileStatus file,
-			final long start, final long len) throws IOException {
-
-		if (!(file instanceof HadoopFileStatus)) {
-			throw new IOException(
-					"file is not an instance of DistributedFileStatus");
-		}
-
-		final HadoopFileStatus f = (HadoopFileStatus) file;
-
-		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs
-				.getFileBlockLocations(f.getInternalFileStatus(), start, len);
-
-		// Wrap up HDFS specific block location objects
-		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
-		for (int i = 0; i < distBlkLocations.length; i++) {
-			distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
-		}
-
-		return distBlkLocations;
-	}
-
-	@Override
-	public FSDataInputStream open(final Path f, final int bufferSize)
-			throws IOException {
-		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
-		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
-		return new HadoopDataInputStream(fdis);
-	}
-
-	@Override
-	public FSDataInputStream open(final Path f) throws IOException {
-		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
-		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path);
-
-		return new HadoopDataInputStream(fdis);
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public FSDataOutputStream create(final Path f, final boolean overwrite,
-			final int bufferSize, final short replication, final long blockSize)
-			throws IOException {
-
-		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
-				new org.apache.hadoop.fs.Path(f.toString()), overwrite,
-				bufferSize, replication, blockSize);
-
-		return new HadoopDataOutputStream(fdos);
-	}
-
-	@Override
-	public FSDataOutputStream create(final Path f, final WriteMode overwrite)
-			throws IOException {
-
-		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
-				new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE);
-
-		return new HadoopDataOutputStream(fdos);
-	}
-
-	@Override
-	public boolean delete(final Path f, final boolean recursive)
-			throws IOException {
-
-		return this.fs.delete(new org.apache.hadoop.fs.Path(f.toString()),
-				recursive);
-	}
-
-	@Override
-	public FileStatus[] listStatus(final Path f) throws IOException {
-
-		final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs
-				.listStatus(new org.apache.hadoop.fs.Path(f.toString()));
-		final FileStatus[] files = new FileStatus[hadoopFiles.length];
-
-		// Convert types
-		for (int i = 0; i < files.length; i++) {
-			files[i] = new HadoopFileStatus(hadoopFiles[i]);
-		}
-
-		return files;
-	}
-
-	@Override
-	public boolean mkdirs(final Path f) throws IOException {
-
-		return this.fs.mkdirs(new org.apache.hadoop.fs.Path(f.toString()));
-	}
-
-	@Override
-	public boolean rename(final Path src, final Path dst) throws IOException {
-
-		return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()),
-				new org.apache.hadoop.fs.Path(dst.toString()));
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public long getDefaultBlockSize() {
-		return this.fs.getDefaultBlockSize();
-	}
-
-	@Override
-	public boolean isDistributedFS() {
-		return true;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Retrieves the CLDB locations for the given MapR cluster name.
-	 *
-	 * @param authority
-	 *            the name of the MapR cluster
-	 * @return a list of CLDB locations
-	 * @throws IOException
-	 *             thrown if the CLDB locations for the given MapR cluster name
-	 *             cannot be determined
-	 */
-	private static String[] getCLDBLocations(final String authority) throws IOException {
-
-		// Determine the MapR home
-		String maprHome = System.getenv(MAPR_HOME_ENV);
-		if (maprHome == null) {
-			maprHome = DEFAULT_MAPR_HOME;
-		}
-
-		final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format(
-					"Trying to retrieve MapR cluster configuration from %s",
-					maprClusterConf));
-		}
-
-		// Read the cluster configuration file, format is specified at
-		// http://doc.mapr.com/display/MapR/mapr-clusters.conf
-
-		try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) {
-
-			String line;
-			while ((line = br.readLine()) != null) {
-
-				// Normalize the string
-				line = line.trim();
-				line = line.replace('\t', ' ');
-
-				final String[] fields = line.split(" ");
-				if (fields.length < 1) {
-					continue;
-				}
-
-				final String clusterName = fields[0];
-
-				if (!clusterName.equals(authority)) {
-					continue;
-				}
-
-				final List<String> cldbLocations = new ArrayList<>();
-
-				for (int i = 1; i < fields.length; ++i) {
-
-					// Make sure this is not a key-value pair MapR recently
-					// introduced in the file format along with their security
-					// features.
-					if (!fields[i].isEmpty() && !fields[i].contains("=")) {
-						cldbLocations.add(fields[i]);
-					}
-				}
-
-				if (cldbLocations.isEmpty()) {
-					throw new IOException(
-							String.format(
-									"%s contains entry for cluster %s but no CLDB locations.",
-									maprClusterConf, authority));
-				}
-
-				return cldbLocations.toArray(new String[cldbLocations.size()]);
-			}
-
-		}
-
-		throw new IOException(String.format(
-				"Unable to find CLDB locations for cluster %s", authority));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
deleted file mode 100644
index ca0630c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.configuration.ConfigConstants;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Collection;
-
-/**
- * Utility class for working with Hadoop-related classes. This should only be used if Hadoop
- * is on the classpath.
- */
-public class HadoopUtils {
-
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
-
-	private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
-
-	public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
-
-		Configuration result = new Configuration();
-		boolean foundHadoopConfiguration = false;
-
-		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
-		// the hdfs configuration
-		// Try to load HDFS configuration from Hadoop's own configuration files
-		// 1. approach: Flink configuration
-		final String hdfsDefaultPath =
-			flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
-
-		if (hdfsDefaultPath != null) {
-			result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
-			LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath);
-			foundHadoopConfiguration = true;
-		} else {
-			LOG.debug("Cannot find hdfs-default configuration-file path in Flink config.");
-		}
-
-		final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
-		if (hdfsSitePath != null) {
-			result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
-			LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath);
-			foundHadoopConfiguration = true;
-		} else {
-			LOG.debug("Cannot find hdfs-site configuration-file path in Flink config.");
-		}
-
-		// 2. Approach environment variables
-		String[] possibleHadoopConfPaths = new String[4];
-		possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
-		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
-
-		if (System.getenv("HADOOP_HOME") != null) {
-			possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf";
-			possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2
-		}
-
-		for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
-			if (possibleHadoopConfPath != null) {
-				if (new File(possibleHadoopConfPath).exists()) {
-					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
-						result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
-						LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
-						foundHadoopConfiguration = true;
-					}
-					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
-						result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
-						LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
-						foundHadoopConfiguration = true;
-					}
-				}
-			}
-		}
-
-		if (!foundHadoopConfiguration) {
-			LOG.debug("Could not find Hadoop configuration via any of the supported methods " +
-				"(Flink configuration, environment variables).");
-		}
-
-		return result;
-	}
-
-	/**
-	 * Indicates whether the current user has an HDFS delegation token.
-	 */
-	public static boolean hasHDFSDelegationToken() throws Exception {
-		UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
-		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
-		for (Token<? extends TokenIdentifier> token : usrTok) {
-			if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
-				return true;
-			}
-		}
-		return false;
-	}
-}


[3/3] flink git commit: [FLINK-7768] [core] Load File Systems via Java Service abstraction

Posted by se...@apache.org.
[FLINK-7768] [core] Load File Systems via Java Service abstraction

This changes the discovery mechanism of file from static class name configurations
to a service mechanism (META-INF/services).

As part of that, it factors HDFS and MapR FS implementations into separate modules.

With this change, users can add new filesystem implementations and make them available
by simply adding them to the class path.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77e3701c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77e3701c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77e3701c

Branch: refs/heads/master
Commit: 77e3701ca1f8bfab33a07f11992955eb131126c3
Parents: bad3df5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 5 18:12:21 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 6 17:45:13 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    | 125 +++++-
 .../apache/flink/core/fs/FileSystemFactory.java |   5 +
 .../flink/core/fs/UnsupportedSchemeFactory.java |  72 ++++
 .../HadoopFileSystemFactoryLoader.java          |  81 ----
 .../fs/factories/LocalFileSystemFactory.java    |  44 ---
 .../flink/core/fs/factories/MapRFsFactory.java  |  75 ----
 .../fs/factories/UnsupportedSchemeFactory.java  |  65 ----
 .../core/fs/local/LocalFileSystemFactory.java   |  49 +++
 .../io/DelimitedInputFormatSamplingTest.java    |   6 -
 .../apache/flink/testutils/TestFileSystem.java  |  31 +-
 .../org.apache.flink.core.fs.FileSystemFactory  |  16 +
 flink-dist/pom.xml                              |  15 +
 flink-filesystems/flink-hadoop-fs/pom.xml       |  62 +++
 .../runtime/fs/hdfs/HadoopBlockLocation.java    | 133 +++++++
 .../runtime/fs/hdfs/HadoopDataInputStream.java  | 139 +++++++
 .../runtime/fs/hdfs/HadoopDataOutputStream.java |  78 ++++
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java |  86 +++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 171 +++++++++
 .../flink/runtime/fs/hdfs/HadoopFsFactory.java  | 186 +++++++++
 .../apache/flink/runtime/util/HadoopUtils.java  | 123 ++++++
 .../fs/hdfs/HadoopConfigLoadingTest.java        | 194 ++++++++++
 .../fs/hdfs/HadoopDataInputStreamTest.java      | 132 +++++++
 .../fs/hdfs/HadoopFreeFsFactoryTest.java        |  83 ++++
 .../flink/runtime/fs/hdfs/HadoopFreeTests.java  |  66 ++++
 .../runtime/fs/hdfs/HadoopFsFactoryTest.java    |  63 ++++
 .../src/test/resources/core-site.xml            |  29 ++
 .../src/test/resources/log4j-test.properties    |  27 ++
 flink-filesystems/flink-mapr-fs/pom.xml         |  77 ++++
 .../flink/runtime/fs/maprfs/MapRFileSystem.java | 175 +++++++++
 .../flink/runtime/fs/maprfs/MapRFsFactory.java  |  75 ++++
 .../org.apache.flink.core.fs.FileSystemFactory  |  16 +
 .../runtime/fs/maprfs/FileSystemAccessTest.java |  42 +++
 .../flink/runtime/fs/maprfs/MapRFreeTests.java  |  74 ++++
 .../runtime/fs/maprfs/MapRFsFactoryTest.java    | 116 ++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 flink-filesystems/pom.xml                       |  43 +++
 flink-runtime/pom.xml                           |  11 +
 .../runtime/fs/hdfs/HadoopBlockLocation.java    | 133 -------
 .../runtime/fs/hdfs/HadoopDataInputStream.java  | 139 -------
 .../runtime/fs/hdfs/HadoopDataOutputStream.java |  78 ----
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java |  86 -----
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 171 ---------
 .../flink/runtime/fs/hdfs/HadoopFsFactory.java  | 167 --------
 .../flink/runtime/fs/maprfs/MapRFileSystem.java | 377 -------------------
 .../apache/flink/runtime/util/HadoopUtils.java  | 121 ------
 .../fs/hdfs/HadoopDataInputStreamTest.java      | 131 -------
 pom.xml                                         |   1 +
 tools/travis_mvn_watchdog.sh                    |  17 +
 48 files changed, 2530 insertions(+), 1703 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 6c18735..a6c9b50 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -30,10 +30,12 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.factories.HadoopFileSystemFactoryLoader;
-import org.apache.flink.core.fs.factories.MapRFsFactory;
 import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.core.fs.factories.LocalFileSystemFactory;
+import org.apache.flink.core.fs.local.LocalFileSystemFactory;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -42,7 +44,8 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
-import java.util.Map;
+import java.util.Iterator;
+import java.util.ServiceLoader;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -198,6 +201,9 @@ public abstract class FileSystem {
 
 	// ------------------------------------------------------------------------
 
+	/** Logger for all FileSystem work */
+	private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
+
 	/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
 	 * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races. */
 	private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
@@ -206,27 +212,23 @@ public abstract class FileSystem {
 	private static final ReentrantLock LOCK = new ReentrantLock(true);
 
 	/** Cache for file systems, by scheme + authority. */
-	private static final Map<FSKey, FileSystem> CACHE = new HashMap<>();
+	private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();
 
 	/** Mapping of file system schemes to  the corresponding implementation factories. */
-	private static final Map<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();
+	private static final HashMap<String, FileSystemFactory> FS_FACTORIES = loadFileSystems();
 
 	/** The default factory that is used when no scheme matches. */
-	private static final FileSystemFactory FALLBACK_FACTORY = HadoopFileSystemFactoryLoader.loadFactory();
+	private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory();
 
 	/** The default filesystem scheme to be used, configured during process-wide initialization.
 	 * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
 	private static URI DEFAULT_SCHEME;
+	
 
 	// ------------------------------------------------------------------------
 	//  Initialization
 	// ------------------------------------------------------------------------
 
-	static {
-		FS_FACTORIES.put("file", new LocalFileSystemFactory());
-		FS_FACTORIES.put("maprfs", new MapRFsFactory());
-	}
-
 	/**
 	 * Initializes the shared file system settings. 
 	 *
@@ -892,6 +894,105 @@ public abstract class FileSystem {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Loads the factories for the file systems directly supported by Flink.
+	 * Aside from the {@link LocalFileSystem}, these file systems are loaded
+	 * via Java's service framework.
+	 *
+	 * @return A map from the file system scheme to corresponding file system factory. 
+	 */
+	private static HashMap<String, FileSystemFactory> loadFileSystems() {
+		final HashMap<String, FileSystemFactory> map = new HashMap<>();
+
+		// by default, we always have the the local file system factory
+		map.put("file", new LocalFileSystemFactory());
+
+		LOG.debug("Loading extension file systems via services");
+
+		try {
+			ServiceLoader<FileSystemFactory> serviceLoader = ServiceLoader.load(FileSystemFactory.class);
+			Iterator<FileSystemFactory> iter = serviceLoader.iterator();
+
+			// we explicitly use an iterator here (rather than for-each) because that way
+			// we can catch errors in individual service instantiations
+
+			//noinspection WhileLoopReplaceableByForEach
+			while (iter.hasNext()) {
+				try {
+					FileSystemFactory factory = iter.next();
+					String scheme = factory.getScheme();
+					map.put(scheme, factory);
+					LOG.debug("Added file system {}:{}", scheme, factory.getClass().getName());
+				}
+				catch (Throwable t) {
+					// catching Throwable here to handle various forms of class loading
+					// and initialization errors
+					ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+					LOG.error("Failed to load a file systems via services", t);
+				}
+			}
+		}
+		catch (Throwable t) {
+			// catching Throwable here to handle various forms of class loading
+			// and initialization errors
+			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+			LOG.error("Failed to load additional file systems via services", t);
+		}
+
+		return map;
+	}
+	
+	/**
+	 * Utility loader for the Hadoop file system factory.
+	 * We treat the Hadoop FS factory in a special way, because we use it as a catch
+	 * all for file systems schemes not supported directly in Flink.
+	 *
+	 * <p>This method does a set of eager checks for availability of certain classes, to
+	 * be able to give better error messages.
+	 */
+	private static FileSystemFactory loadHadoopFsFactory() {
+		final ClassLoader cl = FileSystem.class.getClassLoader();
+
+		// first, see if the Flink runtime classes are available
+		final Class<? extends FileSystemFactory> factoryClass;
+		try {
+			factoryClass = Class.forName("org.apache.flink.runtime.fs.hdfs.HadoopFsFactory", false, cl).asSubclass(FileSystemFactory.class);
+		}
+		catch (ClassNotFoundException e) {
+			LOG.info("No Flink runtime dependency present. " + 
+					"The extended set of supported File Systems via Hadoop is not available.");
+			return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies.");
+		}
+		catch (Exception | LinkageError e) {
+			LOG.warn("Flink's Hadoop file system factory could not be loaded", e);
+			return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e);
+		}
+
+		// check (for eager and better exception messages) if the Hadoop classes are available here
+		try {
+			Class.forName("org.apache.hadoop.conf.Configuration", false, cl);
+			Class.forName("org.apache.hadoop.fs.FileSystem", false, cl);
+		}
+		catch (ClassNotFoundException e) {
+			LOG.info("Hadoop is not in the classpath/dependencies. " +
+					"The extended set of supported File Systems via Hadoop is not available.");
+			return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies.");
+		}
+
+		// Create the factory.
+		try {
+			return factoryClass.newInstance();
+		}
+		catch (Exception | LinkageError e) {
+			LOG.warn("Flink's Hadoop file system factory could not be created", e);
+			return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be created", e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
 	/**
 	 * An identifier of a file system, via its scheme and its authority.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
index 503f21f..982da35 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
@@ -34,6 +34,11 @@ import java.net.URI;
 public interface FileSystemFactory {
 
 	/**
+	 * Gets the scheme of the file system created by this factory.
+	 */
+	String getScheme();
+
+	/**
 	 * Applies the given configuration to this factory. All future file system
 	 * instantiations via {@link #create(URI)} should take the configuration into
 	 * account.

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
new file mode 100644
index 0000000..234b49f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system factory to throw an UnsupportedFileSystemSchemeException when called.
+ */
+@Internal
+class UnsupportedSchemeFactory implements FileSystemFactory {
+
+	private final String exceptionMessage;
+
+	@Nullable
+	private final Throwable exceptionCause;
+
+	public UnsupportedSchemeFactory(String exceptionMessage) {
+		this(exceptionMessage, null);
+	}
+
+	public UnsupportedSchemeFactory(String exceptionMessage, @Nullable Throwable exceptionCause) {
+		this.exceptionMessage = checkNotNull(exceptionMessage);
+		this.exceptionCause = exceptionCause;
+	}
+
+	@Override
+	public String getScheme() {
+		return "n/a";
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		// nothing to do here
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		if (exceptionCause == null) {
+			throw new UnsupportedFileSystemSchemeException(exceptionMessage);
+		}
+		else {
+			throw new UnsupportedFileSystemSchemeException(exceptionMessage, exceptionCause);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java b/flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
deleted file mode 100644
index ed584ef..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/factories/HadoopFileSystemFactoryLoader.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs.factories;
-
-import org.apache.flink.core.fs.FileSystemFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A utility class to check and reflectively load the Hadoop file system factory.
- */
-public class HadoopFileSystemFactoryLoader {
-
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopFileSystemFactoryLoader.class);
-
-	private static final String FACTORY_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFsFactory";
-
-	private static final String HADOOP_CONFIG_CLASS = "org.apache.hadoop.conf.Configuration";
-
-	private static final String HADOOP_FS_CLASS = "org.apache.hadoop.fs.FileSystem";
-
-
-	/**
-	 * Loads the FileSystemFactory for the Hadoop-backed file systems.
-	 */
-	public static FileSystemFactory loadFactory() {
-		final ClassLoader cl = HadoopFileSystemFactoryLoader.class.getClassLoader();
-
-		// first, see if the Flink runtime classes are available
-		final Class<? extends FileSystemFactory> factoryClass;
-		try {
-			factoryClass = Class.forName(FACTORY_CLASS, false, cl).asSubclass(FileSystemFactory.class);
-		}
-		catch (ClassNotFoundException e) {
-			LOG.info("No Flink runtime dependency present - the extended set of supported File Systems " +
-					"via Hadoop is not available.");
-			return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies.");
-		}
-		catch (Exception | LinkageError e) {
-			LOG.warn("Flink's Hadoop file system factory could not be loaded", e);
-			return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e);
-		}
-
-		// check (for eager and better exception messages) if the Hadoop classes are available here
-		try {
-			Class.forName(HADOOP_CONFIG_CLASS, false, cl);
-			Class.forName(HADOOP_FS_CLASS, false, cl);
-		}
-		catch (ClassNotFoundException e) {
-			LOG.info("Hadoop is not in the classpath/dependencies - the extended set of supported File Systems " +
-					"via Hadoop is not available.");
-			return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies.");
-		}
-
-		// Create the factory.
-		try {
-			return factoryClass.newInstance();
-		}
-		catch (Exception | LinkageError e) {
-			LOG.warn("Flink's Hadoop file system factory could not be created", e);
-			return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be created", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/factories/LocalFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/factories/LocalFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/factories/LocalFileSystemFactory.java
deleted file mode 100644
index fc04de5..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/factories/LocalFileSystemFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.core.fs.local.LocalFileSystem;
-
-import java.net.URI;
-
-/**
- * A factory for the {@link LocalFileSystem}.
- */
-@PublicEvolving
-public class LocalFileSystemFactory implements FileSystemFactory {
-
-	@Override
-	public void configure(Configuration config) {
-		// the local file system takes no configuration, so nothing to do here
-	}
-
-	@Override
-	public FileSystem create(URI fsUri) {
-		return LocalFileSystem.getSharedInstance();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/factories/MapRFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/factories/MapRFsFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/factories/MapRFsFactory.java
deleted file mode 100644
index 271e5bc..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/factories/MapRFsFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs.factories;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URI;
-
-/**
- * A factory for the MapR file system.
- * 
- * <p>This factory tries to reflectively instantiate the MapR file system. It can only be
- * used when the MapR FS libraries are in the classpath.
- */
-public class MapRFsFactory implements FileSystemFactory {
-
-	private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";
-
-	@Override
-	public void configure(Configuration config) {
-		// nothing to configure based on the configuration here
-	}
-
-	@Override
-	public FileSystem create(URI fsUri) throws IOException {
-		try {
-			Class<? extends FileSystem> fsClass = Class.forName(
-					MAPR_FILESYSTEM_CLASS, false, getClass().getClassLoader()).asSubclass(FileSystem.class);
-
-			Constructor<? extends FileSystem> constructor = fsClass.getConstructor(URI.class);
-
-			try {
-				return constructor.newInstance(fsUri);
-			}
-			catch (InvocationTargetException e) {
-				throw e.getTargetException();
-			}
-		}
-		catch (ClassNotFoundException e) {
-			throw new IOException("Could not load MapR file system class '" + MAPR_FILESYSTEM_CLASS + 
-					"\'. Please make sure the Flink runtime classes are part of the classpath or dependencies.", e);
-		}
-		catch (LinkageError e) {
-			throw new IOException("Some of the MapR FS or required Hadoop classes seem to be missing or incompatible. " 
-					+ "Please check that a compatible version of the MapR Hadoop libraries is in the classpath.", e);
-		}
-		catch (IOException e) {
-			throw e;
-		}
-		catch (Throwable t) {
-			throw new IOException("Could not instantiate MapR file system class '" + MAPR_FILESYSTEM_CLASS + "'.", t);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/factories/UnsupportedSchemeFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/factories/UnsupportedSchemeFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/factories/UnsupportedSchemeFactory.java
deleted file mode 100644
index 8464b63..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/factories/UnsupportedSchemeFactory.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs.factories;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.net.URI;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A file system factory to throw an UnsupportedFileSystemSchemeException when called.
- */
-public class UnsupportedSchemeFactory implements FileSystemFactory {
-
-	private final String exceptionMessage;
-
-	@Nullable
-	private final Throwable exceptionCause;
-
-	public UnsupportedSchemeFactory(String exceptionMessage) {
-		this(exceptionMessage, null);
-	}
-
-	public UnsupportedSchemeFactory(String exceptionMessage, @Nullable Throwable exceptionCause) {
-		this.exceptionMessage = checkNotNull(exceptionMessage);
-		this.exceptionCause = exceptionCause;
-	}
-
-	@Override
-	public void configure(Configuration config) {
-		// nothing to do here
-	}
-
-	@Override
-	public FileSystem create(URI fsUri) throws IOException {
-		if (exceptionCause == null) {
-			throw new UnsupportedFileSystemSchemeException(exceptionMessage);
-		}
-		else {
-			throw new UnsupportedFileSystemSchemeException(exceptionMessage, exceptionCause);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
new file mode 100644
index 0000000..7cbc2bd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import java.net.URI;
+
+/**
+ * A factory for the {@link LocalFileSystem}.
+ */
+@PublicEvolving
+public class LocalFileSystemFactory implements FileSystemFactory {
+
+	@Override
+	public String getScheme() {
+		return LocalFileSystem.getLocalFsURI().getScheme();
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		// the local file system takes no configuration, so nothing to do here
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) {
+		return LocalFileSystem.getSharedInstance();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
index be73798..01f8680 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
@@ -76,12 +76,6 @@ public class DelimitedInputFormatSamplingTest {
 	@BeforeClass
 	public static void initialize() {
 		try {
-			TestFileSystem.registerTestFileSysten();
-		} catch (Throwable t) {
-			Assert.fail("Could not setup the mock test filesystem.");
-		}
-		
-		try {
 			// make sure we do 4 samples
 			CONFIG = TestConfigUtils.loadGlobalConf(
 				new String[] { ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
index 1e5a7b0..b799152 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
@@ -19,9 +19,7 @@
 package org.apache.flink.testutils;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.URI;
-import java.util.Map;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -32,8 +30,14 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileStatus;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 
+/**
+ * A test file system. This also has a service entry in the test
+ * resources, to be loaded during tests.
+ */
 public class TestFileSystem extends LocalFileSystem {
-	
+
+	public static final String SCHEME = "test";
+
 	private static int streamOpenCounter;
 	
 	public static int getNumtimeStreamOpened() {
@@ -74,24 +78,17 @@ public class TestFileSystem extends LocalFileSystem {
 
 	@Override
 	public URI getUri() {
-		return URI.create("test:///");
-	}
-
-	public static void registerTestFileSysten() throws Exception {
-		Class<FileSystem> fsClass = FileSystem.class;
-		Field dirField = fsClass.getDeclaredField("FS_FACTORIES");
-
-		dirField.setAccessible(true);
-		@SuppressWarnings("unchecked")
-		Map<String, FileSystemFactory> map = (Map<String, FileSystemFactory>) dirField.get(null);
-		dirField.setAccessible(false);
-
-		map.put("test", new TestFileSystemFactory());
+		return URI.create(SCHEME + ":///");
 	}
 
 	// ------------------------------------------------------------------------
 
-	private static final class TestFileSystemFactory implements FileSystemFactory {
+	public static final class TestFileSystemFactory implements FileSystemFactory {
+
+		@Override
+		public String getScheme() {
+			return SCHEME;
+		}
 
 		@Override
 		public void configure(Configuration config) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..5a3a31d
--- /dev/null
+++ b/flink-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.testutils.TestFileSystem$TestFileSystemFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index ffad448..8120cd8 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -205,6 +205,21 @@ under the License.
 			</exclusions>
 		</dependency>
 
+		<!-- Default file system support. The Hadoop and MapR dependencies -->
+		<!--       are optional, so not being added to the dist jar        -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-fs</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-mapr-fs</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 		<!-- Concrete logging framework - we add this only here (and not in the 
 			root POM) to not tie the projects to one specific framework and make
 			it easier for users to swap logging frameworks -->

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/pom.xml b/flink-filesystems/flink-hadoop-fs/pom.xml
new file mode 100644
index 0000000..7cb4b4c
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-filesystems</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-hadoop-fs</artifactId>
+	<name>flink-hadoop-fs</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<!-- Hadoop as an optional dependency, so we can hard depend on this without -->
+		<!-- pulling in Hadoop by default -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-hadoop2</artifactId>
+			<version>${project.version}</version>
+			<optional>true</optional>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
new file mode 100644
index 0000000..1484c95
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.BlockLocation;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Implementation of the {@link BlockLocation} interface for the
+ * Hadoop Distributed File System.
+ */
+public final class HadoopBlockLocation implements BlockLocation {
+
+	/**
+	 * Specifies the character separating the hostname from the domain name.
+	 */
+	private static final char DOMAIN_SEPARATOR = '.';
+
+	/**
+	 * Regular expression for an IPv4 address.
+	 */
+	private static final Pattern IPV4_PATTERN = Pattern.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+$");
+
+	/**
+	 * The original Hadoop block location object.
+	 */
+	private final org.apache.hadoop.fs.BlockLocation blockLocation;
+
+	/**
+	 * Stores the hostnames without the domain suffix.
+	 */
+	private String[] hostnames;
+
+	/**
+	 * Creates a new block location.
+	 *
+	 * @param blockLocation
+	 *        the original HDFS block location
+	 */
+	public HadoopBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
+
+		this.blockLocation = blockLocation;
+	}
+
+	@Override
+	public String[] getHosts() throws IOException {
+
+		/**
+		 * Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
+		 * the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
+		 * depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
+		 * sure it does not contain the domain suffix.
+		 */
+		if (this.hostnames == null) {
+
+			final String[] hadoopHostnames = blockLocation.getHosts();
+			this.hostnames = new String[hadoopHostnames.length];
+
+			for (int i = 0; i < hadoopHostnames.length; ++i) {
+				this.hostnames[i] = stripHostname(hadoopHostnames[i]);
+			}
+		}
+
+		return this.hostnames;
+	}
+
+	/**
+	 * Looks for a domain suffix in a FQDN and strips it if present.
+	 *
+	 * @param originalHostname
+	 *        the original hostname, possibly an FQDN
+	 * @return the stripped hostname without the domain suffix
+	 */
+	private static String stripHostname(final String originalHostname) {
+
+		// Check if the hostname domains the domain separator character
+		final int index = originalHostname.indexOf(DOMAIN_SEPARATOR);
+		if (index == -1) {
+			return originalHostname;
+		}
+
+		// Make sure we are not stripping an IPv4 address
+		final Matcher matcher = IPV4_PATTERN.matcher(originalHostname);
+		if (matcher.matches()) {
+			return originalHostname;
+		}
+
+		if (index == 0) {
+			throw new IllegalStateException("Hostname " + originalHostname + " starts with a " + DOMAIN_SEPARATOR);
+		}
+
+		return originalHostname.substring(0, index);
+	}
+
+	@Override
+	public long getLength() {
+
+		return this.blockLocation.getLength();
+	}
+
+	@Override
+	public long getOffset() {
+
+		return this.blockLocation.getOffset();
+	}
+
+	@Override
+	public int compareTo(final BlockLocation o) {
+
+		final long diff = getOffset() - o.getOffset();
+
+		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
new file mode 100644
index 0000000..da50c4c
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Concrete implementation of the {@link FSDataInputStream} for Hadoop's input streams.
+ * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
+ */
+public final class HadoopDataInputStream extends FSDataInputStream {
+
+	/**
+	 * Minimum amount of bytes to skip forward before we issue a seek instead of discarding read.
+	 *
+	 * <p>The current value is just a magic number. In the long run, this value could become configurable, but for now it
+	 * is a conservative, relatively small value that should bring safe improvements for small skips (e.g. in reading
+	 * meta data), that would hurt the most with frequent seeks.
+	 *
+	 * <p>The optimal value depends on the DFS implementation and configuration plus the underlying filesystem.
+	 * For now, this number is chosen "big enough" to provide improvements for smaller seeks, and "small enough" to
+	 * avoid disadvantages over real seeks. While the minimum should be the page size, a true optimum per system would
+	 * be the amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, seektime is not
+	 * constant and devices, OS, and DFS potentially also use read buffers and read-ahead.
+	 */
+	public static final int MIN_SKIP_BYTES = 1024 * 1024;
+
+	/** The internal stream. */
+	private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
+
+	/**
+	 * Creates a new data input stream from the given Hadoop input stream.
+	 *
+	 * @param fsDataInputStream The Hadoop input stream
+	 */
+	public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
+		this.fsDataInputStream = checkNotNull(fsDataInputStream);
+	}
+
+	@Override
+	public void seek(long seekPos) throws IOException {
+		// We do some optimizations to avoid that some implementations of distributed FS perform
+		// expensive seeks when they are actually not needed.
+		long delta = seekPos - getPos();
+
+		if (delta > 0L && delta <= MIN_SKIP_BYTES) {
+			// Instead of a small forward seek, we skip over the gap
+			skipFully(delta);
+		} else if (delta != 0L) {
+			// For larger gaps and backward seeks, we do a real seek
+			forceSeek(seekPos);
+		} // Do nothing if delta is zero.
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return fsDataInputStream.getPos();
+	}
+
+	@Override
+	public int read() throws IOException {
+		return fsDataInputStream.read();
+	}
+
+	@Override
+	public void close() throws IOException {
+		fsDataInputStream.close();
+	}
+
+	@Override
+	public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
+		return fsDataInputStream.read(buffer, offset, length);
+	}
+
+	@Override
+	public int available() throws IOException {
+		return fsDataInputStream.available();
+	}
+
+	@Override
+	public long skip(long n) throws IOException {
+		return fsDataInputStream.skip(n);
+	}
+
+	/**
+	 * Gets the wrapped Hadoop input stream.
+	 * @return The wrapped Hadoop input stream.
+	 */
+	public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream() {
+		return fsDataInputStream;
+	}
+
+	/**
+	 * Positions the stream to the given location. In contrast to {@link #seek(long)}, this method will
+	 * always issue a "seek" command to the dfs and may not replace it by {@link #skip(long)} for small seeks.
+	 *
+	 * <p>Notice that the underlying DFS implementation can still decide to do skip instead of seek.
+	 *
+	 * @param seekPos the position to seek to.
+	 * @throws IOException
+	 */
+	public void forceSeek(long seekPos) throws IOException {
+		fsDataInputStream.seek(seekPos);
+	}
+
+	/**
+	 * Skips over a given amount of bytes in the stream.
+	 *
+	 * @param bytes the number of bytes to skip.
+	 * @throws IOException
+	 */
+	public void skipFully(long bytes) throws IOException {
+		while (bytes > 0) {
+			bytes -= fsDataInputStream.skip(bytes);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
new file mode 100644
index 0000000..1b8d1a3
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+
+/**
+ * Concrete implementation of the {@link FSDataOutputStream} for Hadoop's input streams.
+ * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
+ */
+public class HadoopDataOutputStream extends FSDataOutputStream {
+
+	private final org.apache.hadoop.fs.FSDataOutputStream fdos;
+
+	public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos) {
+		if (fdos == null) {
+			throw new NullPointerException();
+		}
+		this.fdos = fdos;
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		fdos.write(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		fdos.write(b, off, len);
+	}
+
+	@Override
+	public void close() throws IOException {
+		fdos.close();
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return fdos.getPos();
+	}
+
+	@Override
+	public void flush() throws IOException {
+		fdos.hflush();
+	}
+
+	@Override
+	public void sync() throws IOException {
+		fdos.hsync();
+	}
+
+	/**
+	 * Gets the wrapped Hadoop output stream.
+	 * @return The wrapped Hadoop output stream.
+	 */
+	public org.apache.hadoop.fs.FSDataOutputStream getHadoopOutputStream() {
+		return fdos;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
new file mode 100644
index 0000000..17bb334
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+
+/**
+ * Concrete implementation of the {@link FileStatus} interface for the
+ * Hadoop Distribution File System.
+ */
+public final class HadoopFileStatus implements FileStatus {
+
+	private org.apache.hadoop.fs.FileStatus fileStatus;
+
+	/**
+	 * Creates a new file status from a HDFS file status.
+	 *
+	 * @param fileStatus
+	 *        the HDFS file status
+	 */
+	public HadoopFileStatus(org.apache.hadoop.fs.FileStatus fileStatus) {
+		this.fileStatus = fileStatus;
+	}
+
+	@Override
+	public long getLen() {
+		return fileStatus.getLen();
+	}
+
+	@Override
+	public long getBlockSize() {
+		long blocksize = fileStatus.getBlockSize();
+		if (blocksize > fileStatus.getLen()) {
+			return fileStatus.getLen();
+		}
+
+		return blocksize;
+	}
+
+	@Override
+	public long getAccessTime() {
+		return fileStatus.getAccessTime();
+	}
+
+	@Override
+	public long getModificationTime() {
+		return fileStatus.getModificationTime();
+	}
+
+	@Override
+	public short getReplication() {
+		return fileStatus.getReplication();
+	}
+
+	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
+		return this.fileStatus;
+	}
+
+	@Override
+	public Path getPath() {
+		return new Path(fileStatus.getPath().toString());
+	}
+
+	@SuppressWarnings("deprecation")
+	@Override
+	public boolean isDir() {
+		return fileStatus.isDir();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
new file mode 100644
index 0000000..5970c9d
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link FileSystem} that wraps an {@link org.apache.hadoop.fs.FileSystem Hadoop File System}.
+ */
+public class HadoopFileSystem extends FileSystem {
+
+	/** The wrapped Hadoop File System. */
+	private final org.apache.hadoop.fs.FileSystem fs;
+
+	/**
+	 * Wraps the given Hadoop File System object as a Flink File System object.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 *
+	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+	 */
+	public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
+		this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+	}
+
+	/**
+	 * Gets the underlying Hadoop FileSystem.
+	 * @return The underlying Hadoop FileSystem.
+	 */
+	public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
+		return this.fs;
+	}
+
+	// ------------------------------------------------------------------------
+	//  file system methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Path getWorkingDirectory() {
+		return new Path(this.fs.getWorkingDirectory().toUri());
+	}
+
+	public Path getHomeDirectory() {
+		return new Path(this.fs.getHomeDirectory().toUri());
+	}
+
+	@Override
+	public URI getUri() {
+		return fs.getUri();
+	}
+
+	@Override
+	public FileStatus getFileStatus(final Path f) throws IOException {
+		org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
+		return new HadoopFileStatus(status);
+	}
+
+	@Override
+	public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
+			throws IOException {
+		if (!(file instanceof HadoopFileStatus)) {
+			throw new IOException("file is not an instance of DistributedFileStatus");
+		}
+
+		final HadoopFileStatus f = (HadoopFileStatus) file;
+
+		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
+			start, len);
+
+		// Wrap up HDFS specific block location objects
+		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
+		for (int i = 0; i < distBlkLocations.length; i++) {
+			distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
+		}
+
+		return distBlkLocations;
+	}
+
+	@Override
+	public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
+		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
+		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
+		return new HadoopDataInputStream(fdis);
+	}
+
+	@Override
+	public HadoopDataInputStream open(final Path f) throws IOException {
+		final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
+		final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
+		return new HadoopDataInputStream(fdis);
+	}
+
+	@Override
+	@SuppressWarnings("deprecation")
+	public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
+			final short replication, final long blockSize) throws IOException {
+		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
+			new org.apache.hadoop.fs.Path(f.toString()), overwrite, bufferSize, replication, blockSize);
+		return new HadoopDataOutputStream(fdos);
+	}
+
+	@Override
+	public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
+		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
+			.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE);
+		return new HadoopDataOutputStream(fsDataOutputStream);
+	}
+
+	@Override
+	public boolean delete(final Path f, final boolean recursive) throws IOException {
+		return this.fs.delete(new org.apache.hadoop.fs.Path(f.toString()), recursive);
+	}
+
+	@Override
+	public FileStatus[] listStatus(final Path f) throws IOException {
+		final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(new org.apache.hadoop.fs.Path(f.toString()));
+		final FileStatus[] files = new FileStatus[hadoopFiles.length];
+
+		// Convert types
+		for (int i = 0; i < files.length; i++) {
+			files[i] = new HadoopFileStatus(hadoopFiles[i]);
+		}
+
+		return files;
+	}
+
+	@Override
+	public boolean mkdirs(final Path f) throws IOException {
+		return this.fs.mkdirs(new org.apache.hadoop.fs.Path(f.toString()));
+	}
+
+	@Override
+	public boolean rename(final Path src, final Path dst) throws IOException {
+		return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()),
+			new org.apache.hadoop.fs.Path(dst.toString()));
+	}
+
+	@SuppressWarnings("deprecation")
+	@Override
+	public long getDefaultBlockSize() {
+		return this.fs.getDefaultBlockSize();
+	}
+
+	@Override
+	public boolean isDistributedFS() {
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
new file mode 100644
index 0000000..50e64e1
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.UnsupportedFileSystemSchemeException;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system factory for Hadoop-based file systems.
+ *
+ * <p>This factory calls Hadoop's mechanism to find a file system implementation for a given file
+ * system scheme (a {@link org.apache.hadoop.fs.FileSystem}) and wraps it as a Flink file system
+ * (a {@link org.apache.flink.core.fs.FileSystem}).
+ */
+public class HadoopFsFactory implements FileSystemFactory {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopFsFactory.class);
+
+	/** Flink's configuration object. */
+	private Configuration flinkConfig;
+
+	/** Hadoop's configuration for the file systems. */
+	private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+	@Override
+	public String getScheme() {
+		// the hadoop factory creates various schemes
+		return "*";
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		flinkConfig = config;
+		hadoopConfig = null; // reset the Hadoop Config
+	}
+
+	@Override
+	public HadoopFileSystem create(URI fsUri) throws IOException {
+		checkNotNull(fsUri, "fsUri");
+
+		final String scheme = fsUri.getScheme();
+		checkArgument(scheme != null, "file system has null scheme");
+
+		// from here on, we need to handle errors due to missing optional
+		// dependency classes
+		try {
+			// -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
+
+			final org.apache.hadoop.conf.Configuration hadoopConfig;
+			if (this.hadoopConfig != null) {
+				hadoopConfig = this.hadoopConfig;
+			}
+			else if (flinkConfig != null) {
+				hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
+				this.hadoopConfig = hadoopConfig;
+			}
+			else {
+				LOG.warn("Hadoop configuration has not been explicitly initialized prior to loading a Hadoop file system."
+						+ " Using configuration from the classpath.");
+
+				hadoopConfig = new org.apache.hadoop.conf.Configuration();
+			}
+
+			// -- (2) get the Hadoop file system class for that scheme
+
+			final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
+			try {
+				fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig);
+			}
+			catch (IOException e) {
+				throw new UnsupportedFileSystemSchemeException(
+						"Hadoop File System abstraction does not support scheme '" + scheme + "'. " +
+								"Either no file system implementation exists for that scheme, " +
+								"or the relevant classes are missing from the classpath.", e);
+			}
+
+			// -- (3) instantiate the Hadoop file system
+
+			LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName());
+
+			final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();
+
+			// -- (4) create the proper URI to initialize the file system
+
+			final URI initUri;
+			if (fsUri.getAuthority() != null) {
+				initUri = fsUri;
+			}
+			else {
+				LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)");
+
+				String configEntry = hadoopConfig.get("fs.defaultFS", null);
+				if (configEntry == null) {
+					// fs.default.name deprecated as of hadoop 2.2.0 - see
+					// http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
+					configEntry = hadoopConfig.get("fs.default.name", null);
+				}
+
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
+				}
+
+				if (configEntry == null) {
+					throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
+							"Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').");
+				}
+				else {
+					try {
+						initUri = URI.create(configEntry);
+					}
+					catch (IllegalArgumentException e) {
+						throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
+								"The configuration contains an invalid file system default name " +
+								"('fs.default.name' or 'fs.defaultFS'): " + configEntry);
+					}
+
+					if (initUri.getAuthority() == null) {
+						throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
+								"Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " +
+								"contains no valid authority component (like hdfs namenode, S3 host, etc)");
+					}
+				}
+			}
+
+			// -- (5) configure the Hadoop file system
+
+			try {
+				hadoopFs.initialize(initUri, hadoopConfig);
+			}
+			catch (UnknownHostException e) {
+				String message = "The Hadoop file system's authority (" + initUri.getAuthority() +
+						"), specified by either the file URI or the configuration, cannot be resolved.";
+
+				throw new IOException(message, e);
+			}
+
+			// all good, return the file system
+			return new HadoopFileSystem(hadoopFs);
+		}
+		catch (ReflectiveOperationException | LinkageError e) {
+			throw new UnsupportedFileSystemSchemeException("Cannot support file system for '" + fsUri.getScheme() +
+					"' via Hadoop, because Hadoop is not in the classpath, or some classes " +
+					"are missing from the classpath.", e);
+		}
+		catch (IOException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new IOException("Cannot instantiate file system for URI: " + fsUri, e);
+		}
+	}
+
+	private static String getMissingAuthorityErrorPrefix(URI fsURI) {
+		return "The given file system URI (" + fsURI.toString() + ") did not describe the authority " +
+				"(like for example HDFS NameNode address/port or S3 host). " +
+				"The attempt to use a configured default authority failed: ";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
new file mode 100644
index 0000000..8bfcb5c
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collection;
+
+/**
+ * Utility class for working with Hadoop-related classes. This should only be used if Hadoop
+ * is on the classpath.
+ */
+public class HadoopUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
+
+	private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+
+	public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
+
+		Configuration result = new Configuration();
+		boolean foundHadoopConfiguration = false;
+
+		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
+		// the hdfs configuration
+		// Try to load HDFS configuration from Hadoop's own configuration files
+		// 1. approach: Flink configuration
+		final String hdfsDefaultPath =
+			flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
+
+		if (hdfsDefaultPath != null) {
+			result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
+			LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath);
+			foundHadoopConfiguration = true;
+		} else {
+			LOG.debug("Cannot find hdfs-default configuration-file path in Flink config.");
+		}
+
+		final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+		if (hdfsSitePath != null) {
+			result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
+			LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath);
+			foundHadoopConfiguration = true;
+		} else {
+			LOG.debug("Cannot find hdfs-site configuration-file path in Flink config.");
+		}
+
+		// 2. Approach environment variables
+		String[] possibleHadoopConfPaths = new String[4];
+		possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
+
+		final String hadoopHome = System.getenv("HADOOP_HOME");
+		if (hadoopHome != null) {
+			possibleHadoopConfPaths[2] = hadoopHome + "/conf";
+			possibleHadoopConfPaths[3] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
+		}
+
+		for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+			if (possibleHadoopConfPath != null) {
+				if (new File(possibleHadoopConfPath).exists()) {
+					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
+						result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
+						LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
+						foundHadoopConfiguration = true;
+					}
+					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
+						result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
+						LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
+						foundHadoopConfiguration = true;
+					}
+				}
+			}
+		}
+
+		if (!foundHadoopConfiguration) {
+			LOG.debug("Could not find Hadoop configuration via any of the supported methods " +
+				"(Flink configuration, environment variables).");
+		}
+
+		return result;
+	}
+
+	/**
+	 * Indicates whether the current user has an HDFS delegation token.
+	 */
+	public static boolean hasHDFSDelegationToken() throws Exception {
+		UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
+		Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
+		for (Token<? extends TokenIdentifier> token : usrTok) {
+			if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
+				return true;
+			}
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java
new file mode 100644
index 0000000..bb2c492
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that validate the loading of the Hadoop configuration, relative to
+ * entries in the Flink configuration and the environment variables.
+ */
+public class HadoopConfigLoadingTest {
+
+	private static final String IN_CP_CONFIG_KEY = "cp_conf_key";
+	private static final String IN_CP_CONFIG_VALUE = "oompf!";
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void loadFromClasspathByDefault() {
+		org.apache.hadoop.conf.Configuration hadoopConf =
+				HadoopUtils.getHadoopConfiguration(new Configuration());
+
+		assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
+	}
+
+	@Test
+	public void loadFromLegacyConfigEntries() throws Exception {
+		final String k1 = "shipmate";
+		final String v1 = "smooth sailing";
+
+		final String k2 = "pirate";
+		final String v2 = "Arrg, yer scurvy dog!";
+
+		final File file1 = tempFolder.newFile("core-site.xml");
+		final File file2 = tempFolder.newFile("hdfs-site.xml");
+
+		printConfig(file1, k1, v1);
+		printConfig(file2, k2, v2);
+
+		final Configuration cfg = new Configuration();
+		cfg.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, file1.getAbsolutePath());
+		cfg.setString(ConfigConstants.HDFS_SITE_CONFIG, file2.getAbsolutePath());
+
+		org.apache.hadoop.conf.Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(cfg);
+
+		// contains extra entries
+		assertEquals(v1, hadoopConf.get(k1, null));
+		assertEquals(v2, hadoopConf.get(k2, null));
+
+		// also contains classpath defaults
+		assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
+	}
+
+	@Test
+	public void loadFromHadoopConfEntry() throws Exception {
+		final String k1 = "singing?";
+		final String v1 = "rain!";
+
+		final String k2 = "dancing?";
+		final String v2 = "shower!";
+
+		final File confDir = tempFolder.newFolder();
+
+		final File file1 = new File(confDir, "core-site.xml");
+		final File file2 = new File(confDir, "hdfs-site.xml");
+
+		printConfig(file1, k1, v1);
+		printConfig(file2, k2, v2);
+
+		final Configuration cfg = new Configuration();
+		cfg.setString(ConfigConstants.PATH_HADOOP_CONFIG, confDir.getAbsolutePath());
+
+		org.apache.hadoop.conf.Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(cfg);
+
+		// contains extra entries
+		assertEquals(v1, hadoopConf.get(k1, null));
+		assertEquals(v2, hadoopConf.get(k2, null));
+
+		// also contains classpath defaults
+		assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
+	}
+
+	@Test
+	public void loadFromEnvVariables() throws Exception {
+		final String k1 = "where?";
+		final String v1 = "I'm on a boat";
+		final String k2 = "when?";
+		final String v2 = "midnight";
+		final String k3 = "why?";
+		final String v3 = "what do you think?";
+		final String k4 = "which way?";
+		final String v4 = "south, always south...";
+		final String k5 = "how long?";
+		final String v5 = "an eternity";
+		final String k6 = "for real?";
+		final String v6 = "quite so...";
+
+		final File hadoopConfDir = tempFolder.newFolder();
+
+		final File hadoopHome = tempFolder.newFolder();
+
+		final File hadoopHomeConf = new File(hadoopHome, "conf");
+		final File hadoopHomeEtc = new File(hadoopHome, "etc/hadoop");
+
+		assertTrue(hadoopHomeConf.mkdirs());
+		assertTrue(hadoopHomeEtc.mkdirs());
+
+		final File file1 = new File(hadoopConfDir, "core-site.xml");
+		final File file2 = new File(hadoopConfDir, "hdfs-site.xml");
+		final File file3 = new File(hadoopHomeConf, "core-site.xml");
+		final File file4 = new File(hadoopHomeConf, "hdfs-site.xml");
+		final File file5 = new File(hadoopHomeEtc, "core-site.xml");
+		final File file6 = new File(hadoopHomeEtc, "hdfs-site.xml");
+
+		printConfig(file1, k1, v1);
+		printConfig(file2, k2, v2);
+		printConfig(file3, k3, v3);
+		printConfig(file4, k4, v4);
+		printConfig(file5, k5, v5);
+		printConfig(file6, k6, v6);
+
+		final org.apache.hadoop.conf.Configuration hadoopConf;
+
+		final Map<String, String> originalEnv = System.getenv();
+		final Map<String, String> newEnv = new HashMap<>(originalEnv);
+		newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath());
+		newEnv.put("HADOOP_HOME", hadoopHome.getAbsolutePath());
+		try {
+			CommonTestUtils.setEnv(newEnv);
+			hadoopConf = HadoopUtils.getHadoopConfiguration(new Configuration());
+		}
+		finally {
+			CommonTestUtils.setEnv(originalEnv);
+		}
+
+		// contains extra entries
+		assertEquals(v1, hadoopConf.get(k1, null));
+		assertEquals(v2, hadoopConf.get(k2, null));
+		assertEquals(v3, hadoopConf.get(k3, null));
+		assertEquals(v4, hadoopConf.get(k4, null));
+		assertEquals(v5, hadoopConf.get(k5, null));
+		assertEquals(v6, hadoopConf.get(k6, null));
+
+		// also contains classpath defaults
+		assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
+	}
+
+	private static void printConfig(File file, String key, String value) throws IOException {
+		try (PrintStream out = new PrintStream(new FileOutputStream(file))) {
+			out.println("<?xml version=\"1.0\"?>");
+			out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>");
+			out.println("<configuration>");
+			out.println("\t<property>");
+			out.println("\t\t<name>" + key + "</name>");
+			out.println("\t\t<value>" + value + "</value>");
+			out.println("\t</property>");
+			out.println("</configuration>");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77e3701c/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
new file mode 100644
index 0000000..c6cf0eb
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the {@link HadoopDataInputStream}.
+ */
+public class HadoopDataInputStreamTest {
+
+	private FSDataInputStream verifyInputStream;
+	private HadoopDataInputStream testInputStream;
+
+	@Test
+	public void testSeekSkip() throws IOException {
+		verifyInputStream = spy(new FSDataInputStream(new SeekableByteArrayInputStream(new byte[2 * HadoopDataInputStream.MIN_SKIP_BYTES])));
+		testInputStream = new HadoopDataInputStream(verifyInputStream);
+		seekAndAssert(10);
+		seekAndAssert(10 + HadoopDataInputStream.MIN_SKIP_BYTES + 1);
+		seekAndAssert(testInputStream.getPos() - 1);
+		seekAndAssert(testInputStream.getPos() + 1);
+		seekAndAssert(testInputStream.getPos() - HadoopDataInputStream.MIN_SKIP_BYTES);
+		seekAndAssert(testInputStream.getPos());
+		seekAndAssert(0);
+		seekAndAssert(testInputStream.getPos() + HadoopDataInputStream.MIN_SKIP_BYTES);
+		seekAndAssert(testInputStream.getPos() + HadoopDataInputStream.MIN_SKIP_BYTES - 1);
+
+		try {
+			seekAndAssert(-1);
+			Assert.fail();
+		} catch (Exception ignore) {
+		}
+
+		try {
+			seekAndAssert(-HadoopDataInputStream.MIN_SKIP_BYTES - 1);
+			Assert.fail();
+		} catch (Exception ignore) {
+		}
+	}
+
+	private void seekAndAssert(long seekPos) throws IOException {
+		Assert.assertEquals(verifyInputStream.getPos(), testInputStream.getPos());
+		long delta = seekPos - testInputStream.getPos();
+		testInputStream.seek(seekPos);
+
+		if (delta > 0L && delta <= HadoopDataInputStream.MIN_SKIP_BYTES) {
+			verify(verifyInputStream, atLeastOnce()).skip(anyLong());
+			verify(verifyInputStream, never()).seek(anyLong());
+		} else if (delta != 0L) {
+			verify(verifyInputStream, atLeastOnce()).seek(seekPos);
+			verify(verifyInputStream, never()).skip(anyLong());
+		} else {
+			verify(verifyInputStream, never()).seek(anyLong());
+			verify(verifyInputStream, never()).skip(anyLong());
+		}
+
+		Assert.assertEquals(seekPos, verifyInputStream.getPos());
+		reset(verifyInputStream);
+	}
+
+	private static final class SeekableByteArrayInputStream
+		extends ByteArrayInputStreamWithPos
+		implements Seekable, PositionedReadable {
+
+		public SeekableByteArrayInputStream(byte[] buffer) {
+			super(buffer);
+		}
+
+		@Override
+		public void seek(long pos) throws IOException {
+			setPosition((int) pos);
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return getPosition();
+		}
+
+		@Override
+		public boolean seekToNewSource(long targetPos) throws IOException {
+			return false;
+		}
+
+		@Override
+		public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void readFully(long position, byte[] buffer) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
+}