You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/31 08:19:50 UTC

[flink] branch master updated (a9d3931 -> 8bdcb00)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a9d3931  [FLINK-13431][hive] Fix nameNode HA configuration was not loaded when running HiveConnector on Yarn
     new 0033448  [hotfix][tests] Refactor MapR FS Tests
     new 8bdcb00  [FLINK-13499][maprfs] Handle MapR dependency purely through reflection

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-filesystems/flink-mapr-fs/pom.xml            |  38 +----
 .../flink/runtime/fs/maprfs/MapRFileSystem.java    | 181 ---------------------
 .../flink/runtime/fs/maprfs/MapRFsFactory.java     | 170 ++++++++++++++++++-
 .../src/test/java/com/mapr/fs/MapRFileSystem.java  |  90 ++++++++++
 .../runtime/fs/maprfs/FileSystemAccessTest.java    |  42 -----
 .../flink/runtime/fs/maprfs/MapRFreeTests.java     |  74 ---------
 .../flink/runtime/fs/maprfs/MapRFsFactoryTest.java |  86 ++--------
 .../runtime/fs/maprfs/MapRNotInClassPathTest.java  | 129 +++++++++++++++
 tools/travis/nightly.sh                            |   3 +-
 tools/travis_controller.sh                         |   3 +-
 tools/travis_watchdog.sh                           |   3 +-
 11 files changed, 411 insertions(+), 408 deletions(-)
 delete mode 100644 flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
 create mode 100644 flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java
 delete mode 100644 flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java
 delete mode 100644 flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java
 create mode 100644 flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRNotInClassPathTest.java


[flink] 01/02: [hotfix][tests] Refactor MapR FS Tests

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0033448f0bc92104e134a9dc7c53be9da936315d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 30 11:04:23 2019 +0200

    [hotfix][tests] Refactor MapR FS Tests
---
 .../runtime/fs/maprfs/FileSystemAccessTest.java    |  42 -------
 .../flink/runtime/fs/maprfs/MapRFreeTests.java     |  74 ------------
 .../flink/runtime/fs/maprfs/MapRFsFactoryTest.java |  86 +++-----------
 .../runtime/fs/maprfs/MapRNotInClassPathTest.java  | 129 +++++++++++++++++++++
 4 files changed, 146 insertions(+), 185 deletions(-)

diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java
deleted file mode 100644
index aee50ba..0000000
--- a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/FileSystemAccessTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.maprfs;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * This test checks that the file system is properly accessible through the
- * service loading abstraction.
- */
-public class FileSystemAccessTest extends TestLogger {
-
-	@Test
-	public void testGetMapRFs() throws Exception {
-		final Path path = new Path("maprfs:///my/path");
-
-		FileSystem fs = path.getFileSystem();
-		assertEquals(path.toUri().getScheme(), fs.getUri().getScheme());
-	}
-}
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java
deleted file mode 100644
index 110fce3..0000000
--- a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFreeTests.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.maprfs;
-
-import org.apache.flink.configuration.Configuration;
-
-import java.io.IOException;
-import java.net.URI;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * A class with tests that require to be run in a MapR/Hadoop-free environment,
- * to test proper error handling when no Hadoop classes are available.
- *
- * <p>This class must be dynamically loaded in a MapR/Hadoop-free class loader.
- */
-// this class is only instantiated via reflection
-@SuppressWarnings("unused")
-public class MapRFreeTests {
-
-	public static void test() throws Exception {
-		// make sure no MapR or Hadoop FS classes are in the classpath
-		try {
-			Class.forName("com.mapr.fs.MapRFileSystem");
-			fail("Cannot run test when MapR classes are in the classpath");
-		}
-		catch (ClassNotFoundException ignored) {}
-
-		try {
-			Class.forName("org.apache.hadoop.fs.FileSystem");
-			fail("Cannot run test when Hadoop classes are in the classpath");
-		}
-		catch (ClassNotFoundException ignored) {}
-
-		try {
-			Class.forName("org.apache.hadoop.conf.Configuration");
-			fail("Cannot run test when Hadoop classes are in the classpath");
-		}
-		catch (ClassNotFoundException ignored) {}
-
-		// this method should complete without a linkage error
-		final MapRFsFactory factory = new MapRFsFactory();
-
-		// this method should also complete without a linkage error
-		factory.configure(new Configuration());
-
-		try {
-			factory.create(new URI("maprfs://somehost:9000/root/dir"));
-			fail("This statement should fail with an exception");
-		}
-		catch (IOException e) {
-			assertTrue(e.getMessage().contains("MapR"));
-			assertTrue(e.getMessage().contains("classpath"));
-		}
-	}
-}
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java
index 781c3d7..2c8357a 100644
--- a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java
+++ b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactoryTest.java
@@ -18,103 +18,51 @@
 
 package org.apache.flink.runtime.fs.maprfs;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.util.ClassLoaderUtils;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the MapRFsFactory.
+ * Tests for the {@link MapRFsFactory}.
  */
 public class MapRFsFactoryTest extends TestLogger {
 
-	/**
-	 * This test validates that the factory can be instantiated and configured even
-	 * when MapR and Hadoop classes are missing from the classpath.
-	 */
 	@Test
-	public void testInstantiationWithoutMapRClasses() throws Exception {
-		// we do reflection magic here to instantiate the test in another class
-		// loader, to make sure no MapR and Hadoop classes are in the classpath
+	public void testMapRFsScheme() throws Exception {
+		final Path path = new Path("maprfs:///my/path");
 
-		final String testClassName = "org.apache.flink.runtime.fs.maprfs.MapRFreeTests";
+		final FileSystem fs = path.getFileSystem();
 
-		final URL[] urls = ClassLoaderUtils.getClasspathURLs();
+		assertEquals(path.toUri().getScheme(), fs.getUri().getScheme());
+	}
 
-		ClassLoader parent = getClass().getClassLoader();
-		ClassLoader maprFreeClassLoader = new MapRFreeClassLoader(urls, parent);
-		Class<?> testClass = Class.forName(testClassName, false, maprFreeClassLoader);
-		Method m = testClass.getDeclaredMethod("test");
+	@Test
+	public void testMapRFsKind() throws Exception {
+		final Path path = new Path("maprfs:///my/path");
 
-		try {
-			m.invoke(null);
-		}
-		catch (InvocationTargetException e) {
-			ExceptionUtils.rethrowException(e.getTargetException(), "exception in method");
-		}
+		final FileSystem fs = path.getFileSystem();
+
+		assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
 	}
 
 	@Test
-	public void testCreateFsWithAuthority() throws Exception {
-		final URI uri = URI.create("maprfs://localhost:12345/");
-
-		MapRFsFactory factory = new MapRFsFactory();
+	public void testCreateWithAuthorityNoCldbFails() throws Exception {
+		final Path path = new Path("maprfs://localhost:12345/");
 
 		try {
-			factory.create(uri);
+			path.getFileSystem();
 			fail("should have failed with an exception");
 		}
 		catch (IOException e) {
 			// expected, because we have no CLDB config available
 		}
 	}
-
-	@Test
-	public void testCreateFsWithMissingAuthority() throws Exception {
-		final URI uri = URI.create("maprfs:///my/path");
-
-		MapRFsFactory factory = new MapRFsFactory();
-		factory.configure(new Configuration());
-
-		FileSystem fs = factory.create(uri);
-		assertEquals("maprfs", fs.getUri().getScheme());
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static final class MapRFreeClassLoader extends URLClassLoader {
-
-		private final ClassLoader properParent;
-
-		MapRFreeClassLoader(URL[] urls, ClassLoader parent) {
-			super(urls, null);
-			properParent = parent;
-		}
-
-		@Override
-		public Class<?> loadClass(String name) throws ClassNotFoundException {
-			if (name.startsWith("com.mapr") || name.startsWith("org.apache.hadoop")) {
-				throw new ClassNotFoundException(name);
-			}
-			else if (name.startsWith("org.apache.log4j")) {
-				return properParent.loadClass(name);
-			}
-			else {
-				return super.loadClass(name);
-			}
-		}
-	}
 }
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRNotInClassPathTest.java b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRNotInClassPathTest.java
new file mode 100644
index 0000000..2d274ee
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/java/org/apache/flink/runtime/fs/maprfs/MapRNotInClassPathTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.maprfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ClassLoaderUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * A class with tests that require to be run in a MapR/Hadoop-free environment,
+ * to test proper error handling when no Hadoop classes are available.
+ *
+ * <p>This class must be dynamically loaded in a MapR/Hadoop-free class loader.
+ */
+public class MapRNotInClassPathTest extends TestLogger {
+
+	@Test
+	public void testInstantiationWhenMapRClassesAreMissing() throws Exception {
+		final String testClassName = "org.apache.flink.runtime.fs.maprfs.MapRNotInClassPathTest$TestRunner";
+		final ClassLoader cl = new MapRFreeClassLoader(getClass().getClassLoader());
+
+		final RunnableWithException testRunner = Class
+			.forName(testClassName, false, cl)
+			.asSubclass(RunnableWithException.class)
+			.newInstance();
+
+		testRunner.run();
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The tests that need to run in a special classloader.
+	 */
+	@SuppressWarnings("unused")
+	public static final class TestRunner implements RunnableWithException {
+
+		@Override
+		public void run() throws Exception {
+			// make sure no MapR or Hadoop FS classes are in the classpath
+			try {
+				Class.forName("com.mapr.fs.MapRFileSystem");
+				fail("Cannot run test when MapR classes are in the classpath");
+			}
+			catch (ClassNotFoundException ignored) {}
+
+			try {
+				Class.forName("org.apache.hadoop.fs.FileSystem");
+				fail("Cannot run test when Hadoop classes are in the classpath");
+			}
+			catch (ClassNotFoundException ignored) {}
+
+			try {
+				Class.forName("org.apache.hadoop.conf.Configuration");
+				fail("Cannot run test when Hadoop classes are in the classpath");
+			}
+			catch (ClassNotFoundException ignored) {}
+
+			// this method should complete without a linkage error
+			final MapRFsFactory factory = new MapRFsFactory();
+
+			// this method should also complete without a linkage error
+			factory.configure(new Configuration());
+
+			try {
+				factory.create(new URI("maprfs://somehost:9000/root/dir"));
+				fail("This statement should fail with an exception");
+			}
+			catch (IOException e) {
+				assertTrue(e.getMessage().contains("MapR"));
+				assertTrue(e.getMessage().contains("classpath"));
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A special classloader that filters "org.apache.hadoop.*" and "com.mapr.*" classes.
+	 */
+	private static final class MapRFreeClassLoader extends URLClassLoader {
+
+		private final ClassLoader properParent;
+
+		MapRFreeClassLoader(ClassLoader parent) {
+			super(ClassLoaderUtils.getClasspathURLs(), null);
+			properParent = parent;
+		}
+
+		@Override
+		public Class<?> loadClass(String name) throws ClassNotFoundException {
+			if (name.startsWith("com.mapr") || name.startsWith("org.apache.hadoop")) {
+				throw new ClassNotFoundException(name);
+			}
+			else if (name.equals(RunnableWithException.class.getName()) || name.startsWith("org.apache.log4j")) {
+				return properParent.loadClass(name);
+			}
+			else {
+				return super.loadClass(name);
+			}
+		}
+	}
+}


[flink] 02/02: [FLINK-13499][maprfs] Handle MapR dependency purely through reflection

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8bdcb001fd7bf8582691b5625cbc5b261ff00cb1
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 30 12:00:07 2019 +0200

    [FLINK-13499][maprfs] Handle MapR dependency purely through reflection
    
    This allows us to remove the MapR dependency from the module.
    The MapR maven dependency has frequently caused issues.
---
 flink-filesystems/flink-mapr-fs/pom.xml            |  38 +----
 .../flink/runtime/fs/maprfs/MapRFileSystem.java    | 181 ---------------------
 .../flink/runtime/fs/maprfs/MapRFsFactory.java     | 170 ++++++++++++++++++-
 .../src/test/java/com/mapr/fs/MapRFileSystem.java  |  90 ++++++++++
 tools/travis/nightly.sh                            |   3 +-
 tools/travis_controller.sh                         |   3 +-
 tools/travis_watchdog.sh                           |   3 +-
 7 files changed, 265 insertions(+), 223 deletions(-)

diff --git a/flink-filesystems/flink-mapr-fs/pom.xml b/flink-filesystems/flink-mapr-fs/pom.xml
index e32954c..cbfd86e 100644
--- a/flink-filesystems/flink-mapr-fs/pom.xml
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -32,35 +32,6 @@ under the License.
 
 	<packaging>jar</packaging>
 
-	<repositories>
-		<repository>
-			<id>mapr-releases</id>
-			<url>https://repository.mapr.com/maven/</url>
-			<snapshots><enabled>false</enabled></snapshots>
-			<releases><enabled>true</enabled></releases>
-		</repository>
-	</repositories>
-
-	<profiles>
-		<profile>
-			<id>unsafe-mapr-repo</id>
-			<activation>
-				<property>
-					<name>unsafe-mapr-repo</name>
-				</property>
-			</activation>
-			<repositories>
-				<!-- MapR -->
-				<repository>
-					<id>mapr-releases</id>
-					<url>http://repository.mapr.com/maven/</url>
-					<snapshots><enabled>false</enabled></snapshots>
-					<releases><enabled>true</enabled></releases>
-				</repository>
-			</repositories>
-		</profile>
-	</profiles>
-
 	<dependencies>
 
 		<dependency>
@@ -75,13 +46,10 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
-		<!-- MapR dependencies as optional dependency, so we can hard depend on this without -->
-		<!-- pulling in MapR libraries by default -->
-
 		<dependency>
-			<groupId>com.mapr.hadoop</groupId>
-			<artifactId>maprfs</artifactId>
-			<version>5.2.1-mapr</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-hadoop-2</artifactId>
+			<version>${hadoop.version}-${flink.shaded.version}</version>
 			<optional>true</optional>
 		</dependency>
 
diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
deleted file mode 100644
index 5aec4a4..0000000
--- a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.maprfs;
-
-import org.apache.flink.core.fs.FileSystemKind;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A MapR file system client for Flink.
- *
- * <p>Internally, this class wraps the {@link org.apache.hadoop.fs.FileSystem} implementation
- * of the MapR file system client.
- */
-public class MapRFileSystem extends HadoopFileSystem {
-
-	private static final Logger LOG = LoggerFactory.getLogger(MapRFileSystem.class);
-
-	/** Name of the environment variable to determine the location of the MapR
-	 * installation. */
-	private static final String MAPR_HOME_ENV = "MAPR_HOME";
-
-	/** The default location of the MapR installation. */
-	private static final String DEFAULT_MAPR_HOME = "/opt/mapr/";
-
-	/** The path relative to the MAPR_HOME where MapR stores how to access the
-	 * configured clusters. */
-	private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf";
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a MapRFileSystem for the given URI.
-	 *
-	 * @param fsUri The URI describing the file system
-	 * @throws IOException Thrown if the file system could not be initialized.
-	 */
-	public MapRFileSystem(URI fsUri) throws IOException {
-		super(instantiateMapRFileSystem(fsUri));
-	}
-
-	private static org.apache.hadoop.fs.FileSystem instantiateMapRFileSystem(URI fsUri) throws IOException {
-		checkNotNull(fsUri, "fsUri");
-
-		final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-		final com.mapr.fs.MapRFileSystem fs;
-
-		final String authority = fsUri.getAuthority();
-		if (authority == null || authority.isEmpty()) {
-
-			// Use the default constructor to instantiate MapR file system object
-			fs = new com.mapr.fs.MapRFileSystem();
-		}
-		else {
-			// We have an authority, check the MapR cluster configuration to
-			// find the CLDB locations.
-			final String[] cldbLocations = getCLDBLocations(authority);
-			fs = new com.mapr.fs.MapRFileSystem(authority, cldbLocations);
-		}
-
-		// now initialize the Hadoop File System object
-		fs.initialize(fsUri, conf);
-
-		return fs;
-	}
-
-	/**
-	 * Retrieves the CLDB locations for the given MapR cluster name.
-	 *
-	 * @param authority
-	 *            the name of the MapR cluster
-	 * @return a list of CLDB locations
-	 * @throws IOException
-	 *             thrown if the CLDB locations for the given MapR cluster name
-	 *             cannot be determined
-	 */
-	private static String[] getCLDBLocations(String authority) throws IOException {
-
-		// Determine the MapR home
-		String maprHome = System.getenv(MAPR_HOME_ENV);
-		if (maprHome == null) {
-			maprHome = DEFAULT_MAPR_HOME;
-		}
-
-		final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format(
-					"Trying to retrieve MapR cluster configuration from %s",
-					maprClusterConf));
-		}
-
-		if (!maprClusterConf.exists()) {
-			throw new IOException("Could not find CLDB configuration '" + maprClusterConf.getAbsolutePath() +
-					"', assuming MapR home is '" + maprHome + "'.");
-		}
-
-		// Read the cluster configuration file, format is specified at
-		// http://doc.mapr.com/display/MapR/mapr-clusters.conf
-
-		try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) {
-
-			String line;
-			while ((line = br.readLine()) != null) {
-
-				// Normalize the string
-				line = line.trim();
-				line = line.replace('\t', ' ');
-
-				final String[] fields = line.split(" ");
-				if (fields.length < 1) {
-					continue;
-				}
-
-				final String clusterName = fields[0];
-
-				if (!clusterName.equals(authority)) {
-					continue;
-				}
-
-				final List<String> cldbLocations = new ArrayList<>();
-
-				for (int i = 1; i < fields.length; ++i) {
-
-					// Make sure this is not a key-value pair MapR recently
-					// introduced in the file format along with their security
-					// features.
-					if (!fields[i].isEmpty() && !fields[i].contains("=")) {
-						cldbLocations.add(fields[i]);
-					}
-				}
-
-				if (cldbLocations.isEmpty()) {
-					throw new IOException(
-							String.format(
-									"%s contains entry for cluster %s but no CLDB locations.",
-									maprClusterConf, authority));
-				}
-
-				return cldbLocations.toArray(new String[cldbLocations.size()]);
-			}
-
-		}
-
-		throw new IOException(String.format(
-				"Unable to find CLDB locations for cluster %s", authority));
-	}
-
-	@Override
-	public FileSystemKind getKind() {
-		return FileSystemKind.FILE_SYSTEM;
-	}
-}
diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
index d738198..b89de68 100644
--- a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
+++ b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
@@ -18,14 +18,23 @@
 
 package org.apache.flink.runtime.fs.maprfs;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -39,6 +48,20 @@ public class MapRFsFactory implements FileSystemFactory {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MapRFsFactory.class);
 
+	/** Name of the environment variable to determine the location of the MapR
+	 * installation. */
+	private static final String MAPR_HOME_ENV = "MAPR_HOME";
+
+	/** The default location of the MapR installation. */
+	private static final String DEFAULT_MAPR_HOME = "/opt/mapr/";
+
+	/** The path relative to the MAPR_HOME where MapR stores how to access the
+	 * configured clusters. */
+	private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf";
+
+	/** Name of the class implementing the MapRFileSystem. */
+	private static final String MAPR_FS_CLASS_NAME = "com.mapr.fs.MapRFileSystem";
+
 	// ------------------------------------------------------------------------
 
 	@Override
@@ -50,10 +73,31 @@ public class MapRFsFactory implements FileSystemFactory {
 	public FileSystem create(URI fsUri) throws IOException {
 		checkNotNull(fsUri, "fsUri");
 
+		checkMaprFsClassInClassPath();
+
 		try {
 			LOG.info("Trying to load and instantiate MapR File System");
 
-			return new MapRFileSystem(fsUri);
+			final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+			final org.apache.hadoop.fs.FileSystem fs;
+
+			final String authority = fsUri.getAuthority();
+			if (authority == null || authority.isEmpty()) {
+
+				// Use the default constructor to instantiate MapR file system object
+				fs = instantiateMapRFsClass();
+			}
+			else {
+				// We have an authority, check the MapR cluster configuration to
+				// find the CLDB locations.
+				final String[] cldbLocations = getCLDBLocations(authority);
+				fs = instantiateMapRFsClass(authority, cldbLocations);
+			}
+
+			// now initialize the Hadoop File System object
+			fs.initialize(fsUri, conf);
+
+			return new HadoopFileSystem(fs);
 		}
 		catch (LinkageError e) {
 			throw new IOException("Could not load MapR file system. "  +
@@ -66,4 +110,128 @@ public class MapRFsFactory implements FileSystemFactory {
 			throw new IOException("Could not instantiate MapR file system.", t);
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  MapR Config Loading
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Retrieves the CLDB locations for the given MapR cluster name.
+	 *
+	 * @param authority
+	 *            the name of the MapR cluster
+	 * @return a list of CLDB locations
+	 * @throws IOException
+	 *             thrown if the CLDB locations for the given MapR cluster name
+	 *             cannot be determined
+	 */
+	private static String[] getCLDBLocations(String authority) throws IOException {
+
+		// Determine the MapR home
+		String maprHome = System.getenv(MAPR_HOME_ENV);
+		if (maprHome == null) {
+			maprHome = DEFAULT_MAPR_HOME;
+		}
+
+		final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(String.format(
+				"Trying to retrieve MapR cluster configuration from %s",
+				maprClusterConf));
+		}
+
+		if (!maprClusterConf.exists()) {
+			throw new IOException("Could not find CLDB configuration '" + maprClusterConf.getAbsolutePath() +
+				"', assuming MapR home is '" + maprHome + "'.");
+		}
+
+		// Read the cluster configuration file, format is specified at
+		// http://doc.mapr.com/display/MapR/mapr-clusters.conf
+
+		try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) {
+
+			String line;
+			while ((line = br.readLine()) != null) {
+
+				// Normalize the string
+				line = line.trim();
+				line = line.replace('\t', ' ');
+
+				final String[] fields = line.split(" ");
+				if (fields.length < 1) {
+					continue;
+				}
+
+				final String clusterName = fields[0];
+
+				if (!clusterName.equals(authority)) {
+					continue;
+				}
+
+				final List<String> cldbLocations = new ArrayList<>();
+
+				for (int i = 1; i < fields.length; ++i) {
+
+					// Make sure this is not a key-value pair MapR recently
+					// introduced in the file format along with their security
+					// features.
+					if (!fields[i].isEmpty() && !fields[i].contains("=")) {
+						cldbLocations.add(fields[i]);
+					}
+				}
+
+				if (cldbLocations.isEmpty()) {
+					throw new IOException(
+						String.format(
+							"%s contains entry for cluster %s but no CLDB locations.",
+							maprClusterConf, authority));
+				}
+
+				return cldbLocations.toArray(new String[cldbLocations.size()]);
+			}
+
+		}
+
+		throw new IOException(String.format(
+			"Unable to find CLDB locations for cluster %s", authority));
+	}
+
+	// ------------------------------------------------------------------------
+	//  Reflective FS Instantiation
+	// ------------------------------------------------------------------------
+
+	private static void checkMaprFsClassInClassPath() throws IOException {
+		try {
+			Class.forName(MAPR_FS_CLASS_NAME, false, MapRFsFactory.class.getClassLoader());
+		}
+		catch (ClassNotFoundException e) {
+			throw new IOException("Cannot find MapR FS in classpath: " + MAPR_FS_CLASS_NAME, e);
+		}
+	}
+
+	@VisibleForTesting
+	static org.apache.hadoop.fs.FileSystem instantiateMapRFsClass(Object... args) throws IOException {
+		final Class<? extends org.apache.hadoop.fs.FileSystem> fsClazz;
+
+		try {
+			fsClazz = Class
+				.forName(MAPR_FS_CLASS_NAME)
+				.asSubclass(org.apache.hadoop.fs.FileSystem.class);
+		} catch (ClassNotFoundException e) {
+			throw new IOException("Cannot load MapR FS. Class missing in classpath", e);
+		} catch (ClassCastException e) {
+			throw new IOException("Class '" + MAPR_FS_CLASS_NAME + "' is not a subclass of org.apache.hadoop.fs.FileSystem");
+		}
+
+		final Class<?>[] constructorArgs = Arrays.stream(args).map(Object::getClass).toArray(Class[]::new);
+		try {
+			final Constructor<? extends org.apache.hadoop.fs.FileSystem> ctor =
+				fsClazz.getConstructor(constructorArgs);
+
+			return ctor.newInstance(args);
+		} catch (Exception e) {
+			throw new IOException("Cannot instantiate MapR FS class", e);
+		}
+	}
 }
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java b/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java
new file mode 100644
index 0000000..b027487
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mapr.fs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Test class that mocks the MapRFileSystem.
+ */
+public class MapRFileSystem extends org.apache.hadoop.fs.FileSystem {
+
+	@Override
+	public URI getUri() {
+		return URI.create("maprfs:/");
+	}
+
+	@Override
+	public FSDataInputStream open(Path path, int i) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l, Progressable progressable) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean rename(Path path, Path path1) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean delete(Path path, boolean b) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public FileStatus[] listStatus(Path path) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setWorkingDirectory(Path path) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Path getWorkingDirectory() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public FileStatus getFileStatus(Path path) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/tools/travis/nightly.sh b/tools/travis/nightly.sh
index 1e47633..433b925 100755
--- a/tools/travis/nightly.sh
+++ b/tools/travis/nightly.sh
@@ -37,8 +37,7 @@ mkdir -p $ARTIFACTS_DIR || { echo "FAILURE: cannot create log directory '${ARTIF
 LOG4J_PROPERTIES=${HERE}/../log4j-travis.properties
 
 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-# We use -Punsafe-mapr-repo since the https version fails on Travis for some reason.
-MVN_COMMON_OPTIONS="-nsu -B -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -Pskip-webui-build -Punsafe-mapr-repo"
+MVN_COMMON_OPTIONS="-nsu -B -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -Pskip-webui-build"
 MVN_COMPILE_OPTIONS="-T1C -DskipTests"
 
 cp tools/travis/splits/* flink-end-to-end-tests
diff --git a/tools/travis_controller.sh b/tools/travis_controller.sh
index 256235b..e7cf94e 100755
--- a/tools/travis_controller.sh
+++ b/tools/travis_controller.sh
@@ -90,8 +90,7 @@ EXIT_CODE=0
 
 # Run actual compile&test steps
 if [ $STAGE == "$STAGE_COMPILE" ]; then
-    # We use -Punsafe-mapr-repo since the https version fails on Travis for some reason.
-	MVN="mvn clean install -nsu -Punsafe-mapr-repo -Dflink.convergence.phase=install -Pcheck-convergence -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests $PROFILE"
+	MVN="mvn clean install -nsu -Dflink.convergence.phase=install -Pcheck-convergence -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests $PROFILE"
 	$MVN
 	EXIT_CODE=$?
 
diff --git a/tools/travis_watchdog.sh b/tools/travis_watchdog.sh
index b9b2b7d..728bf3b 100755
--- a/tools/travis_watchdog.sh
+++ b/tools/travis_watchdog.sh
@@ -60,9 +60,8 @@ MVN_TEST_MODULES=$(get_test_modules_for_stage ${TEST})
 # -nsu option forbids downloading snapshot artifacts. The only snapshot artifacts we depend are from
 # Flink, which however should all be built locally. see FLINK-7230
 #
-# We use -Punsafe-mapr-repo since the https version fails on Travis for some reason.
 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -B -Pskip-webui-build -Punsafe-mapr-repo $MVN_LOGGING_OPTIONS"
+MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -B -Pskip-webui-build $MVN_LOGGING_OPTIONS"
 MVN_COMPILE_OPTIONS="-DskipTests"
 MVN_TEST_OPTIONS="$MVN_LOGGING_OPTIONS -Dflink.tests.with-openssl"