You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/31 08:19:52 UTC
[flink] 02/02: [FLINK-13499][maprfs] Handle MapR dependency purely
through reflection
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8bdcb001fd7bf8582691b5625cbc5b261ff00cb1
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 30 12:00:07 2019 +0200
[FLINK-13499][maprfs] Handle MapR dependency purely through reflection
This allows us to remove the MapR dependency from the module.
The MapR maven dependency has frequently caused issues.
---
flink-filesystems/flink-mapr-fs/pom.xml | 38 +----
.../flink/runtime/fs/maprfs/MapRFileSystem.java | 181 ---------------------
.../flink/runtime/fs/maprfs/MapRFsFactory.java | 170 ++++++++++++++++++-
.../src/test/java/com/mapr/fs/MapRFileSystem.java | 90 ++++++++++
tools/travis/nightly.sh | 3 +-
tools/travis_controller.sh | 3 +-
tools/travis_watchdog.sh | 3 +-
7 files changed, 265 insertions(+), 223 deletions(-)
diff --git a/flink-filesystems/flink-mapr-fs/pom.xml b/flink-filesystems/flink-mapr-fs/pom.xml
index e32954c..cbfd86e 100644
--- a/flink-filesystems/flink-mapr-fs/pom.xml
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -32,35 +32,6 @@ under the License.
<packaging>jar</packaging>
- <repositories>
- <repository>
- <id>mapr-releases</id>
- <url>https://repository.mapr.com/maven/</url>
- <snapshots><enabled>false</enabled></snapshots>
- <releases><enabled>true</enabled></releases>
- </repository>
- </repositories>
-
- <profiles>
- <profile>
- <id>unsafe-mapr-repo</id>
- <activation>
- <property>
- <name>unsafe-mapr-repo</name>
- </property>
- </activation>
- <repositories>
- <!-- MapR -->
- <repository>
- <id>mapr-releases</id>
- <url>http://repository.mapr.com/maven/</url>
- <snapshots><enabled>false</enabled></snapshots>
- <releases><enabled>true</enabled></releases>
- </repository>
- </repositories>
- </profile>
- </profiles>
-
<dependencies>
<dependency>
@@ -75,13 +46,10 @@ under the License.
<version>${project.version}</version>
</dependency>
- <!-- MapR dependencies as optional dependency, so we can hard depend on this without -->
- <!-- pulling in MapR libraries by default -->
-
<dependency>
- <groupId>com.mapr.hadoop</groupId>
- <artifactId>maprfs</artifactId>
- <version>5.2.1-mapr</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2</artifactId>
+ <version>${hadoop.version}-${flink.shaded.version}</version>
<optional>true</optional>
</dependency>
diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
deleted file mode 100644
index 5aec4a4..0000000
--- a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.maprfs;
-
-import org.apache.flink.core.fs.FileSystemKind;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A MapR file system client for Flink.
- *
- * <p>Internally, this class wraps the {@link org.apache.hadoop.fs.FileSystem} implementation
- * of the MapR file system client.
- */
-public class MapRFileSystem extends HadoopFileSystem {
-
- private static final Logger LOG = LoggerFactory.getLogger(MapRFileSystem.class);
-
- /** Name of the environment variable to determine the location of the MapR
- * installation. */
- private static final String MAPR_HOME_ENV = "MAPR_HOME";
-
- /** The default location of the MapR installation. */
- private static final String DEFAULT_MAPR_HOME = "/opt/mapr/";
-
- /** The path relative to the MAPR_HOME where MapR stores how to access the
- * configured clusters. */
- private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf";
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a MapRFileSystem for the given URI.
- *
- * @param fsUri The URI describing the file system
- * @throws IOException Thrown if the file system could not be initialized.
- */
- public MapRFileSystem(URI fsUri) throws IOException {
- super(instantiateMapRFileSystem(fsUri));
- }
-
- private static org.apache.hadoop.fs.FileSystem instantiateMapRFileSystem(URI fsUri) throws IOException {
- checkNotNull(fsUri, "fsUri");
-
- final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
- final com.mapr.fs.MapRFileSystem fs;
-
- final String authority = fsUri.getAuthority();
- if (authority == null || authority.isEmpty()) {
-
- // Use the default constructor to instantiate MapR file system object
- fs = new com.mapr.fs.MapRFileSystem();
- }
- else {
- // We have an authority, check the MapR cluster configuration to
- // find the CLDB locations.
- final String[] cldbLocations = getCLDBLocations(authority);
- fs = new com.mapr.fs.MapRFileSystem(authority, cldbLocations);
- }
-
- // now initialize the Hadoop File System object
- fs.initialize(fsUri, conf);
-
- return fs;
- }
-
- /**
- * Retrieves the CLDB locations for the given MapR cluster name.
- *
- * @param authority
- * the name of the MapR cluster
- * @return a list of CLDB locations
- * @throws IOException
- * thrown if the CLDB locations for the given MapR cluster name
- * cannot be determined
- */
- private static String[] getCLDBLocations(String authority) throws IOException {
-
- // Determine the MapR home
- String maprHome = System.getenv(MAPR_HOME_ENV);
- if (maprHome == null) {
- maprHome = DEFAULT_MAPR_HOME;
- }
-
- final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Trying to retrieve MapR cluster configuration from %s",
- maprClusterConf));
- }
-
- if (!maprClusterConf.exists()) {
- throw new IOException("Could not find CLDB configuration '" + maprClusterConf.getAbsolutePath() +
- "', assuming MapR home is '" + maprHome + "'.");
- }
-
- // Read the cluster configuration file, format is specified at
- // http://doc.mapr.com/display/MapR/mapr-clusters.conf
-
- try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) {
-
- String line;
- while ((line = br.readLine()) != null) {
-
- // Normalize the string
- line = line.trim();
- line = line.replace('\t', ' ');
-
- final String[] fields = line.split(" ");
- if (fields.length < 1) {
- continue;
- }
-
- final String clusterName = fields[0];
-
- if (!clusterName.equals(authority)) {
- continue;
- }
-
- final List<String> cldbLocations = new ArrayList<>();
-
- for (int i = 1; i < fields.length; ++i) {
-
- // Make sure this is not a key-value pair MapR recently
- // introduced in the file format along with their security
- // features.
- if (!fields[i].isEmpty() && !fields[i].contains("=")) {
- cldbLocations.add(fields[i]);
- }
- }
-
- if (cldbLocations.isEmpty()) {
- throw new IOException(
- String.format(
- "%s contains entry for cluster %s but no CLDB locations.",
- maprClusterConf, authority));
- }
-
- return cldbLocations.toArray(new String[cldbLocations.size()]);
- }
-
- }
-
- throw new IOException(String.format(
- "Unable to find CLDB locations for cluster %s", authority));
- }
-
- @Override
- public FileSystemKind getKind() {
- return FileSystemKind.FILE_SYSTEM;
- }
-}
diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
index d738198..b89de68 100644
--- a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
+++ b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
@@ -18,14 +18,23 @@
package org.apache.flink.runtime.fs.maprfs;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -39,6 +48,20 @@ public class MapRFsFactory implements FileSystemFactory {
private static final Logger LOG = LoggerFactory.getLogger(MapRFsFactory.class);
+ /** Name of the environment variable to determine the location of the MapR
+ * installation. */
+ private static final String MAPR_HOME_ENV = "MAPR_HOME";
+
+ /** The default location of the MapR installation. */
+ private static final String DEFAULT_MAPR_HOME = "/opt/mapr/";
+
+ /** The path relative to the MAPR_HOME where MapR stores how to access the
+ * configured clusters. */
+ private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf";
+
+ /** Name of the class implementing the MapRFileSystem. */
+ private static final String MAPR_FS_CLASS_NAME = "com.mapr.fs.MapRFileSystem";
+
// ------------------------------------------------------------------------
@Override
@@ -50,10 +73,31 @@ public class MapRFsFactory implements FileSystemFactory {
public FileSystem create(URI fsUri) throws IOException {
checkNotNull(fsUri, "fsUri");
+ checkMaprFsClassInClassPath();
+
try {
LOG.info("Trying to load and instantiate MapR File System");
- return new MapRFileSystem(fsUri);
+ final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ final org.apache.hadoop.fs.FileSystem fs;
+
+ final String authority = fsUri.getAuthority();
+ if (authority == null || authority.isEmpty()) {
+
+ // Use the default constructor to instantiate MapR file system object
+ fs = instantiateMapRFsClass();
+ }
+ else {
+ // We have an authority, check the MapR cluster configuration to
+ // find the CLDB locations.
+ final String[] cldbLocations = getCLDBLocations(authority);
+ fs = instantiateMapRFsClass(authority, cldbLocations);
+ }
+
+ // now initialize the Hadoop File System object
+ fs.initialize(fsUri, conf);
+
+ return new HadoopFileSystem(fs);
}
catch (LinkageError e) {
throw new IOException("Could not load MapR file system. " +
@@ -66,4 +110,128 @@ public class MapRFsFactory implements FileSystemFactory {
throw new IOException("Could not instantiate MapR file system.", t);
}
}
+
+ // ------------------------------------------------------------------------
+ // MapR Config Loading
+ // ------------------------------------------------------------------------
+
+ /**
+ * Retrieves the CLDB locations for the given MapR cluster name.
+ *
+ * @param authority
+ * the name of the MapR cluster
+ * @return a list of CLDB locations
+ * @throws IOException
+ * thrown if the CLDB locations for the given MapR cluster name
+ * cannot be determined
+ */
+ private static String[] getCLDBLocations(String authority) throws IOException {
+
+ // Determine the MapR home
+ String maprHome = System.getenv(MAPR_HOME_ENV);
+ if (maprHome == null) {
+ maprHome = DEFAULT_MAPR_HOME;
+ }
+
+ final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Trying to retrieve MapR cluster configuration from %s",
+ maprClusterConf));
+ }
+
+ if (!maprClusterConf.exists()) {
+ throw new IOException("Could not find CLDB configuration '" + maprClusterConf.getAbsolutePath() +
+ "', assuming MapR home is '" + maprHome + "'.");
+ }
+
+ // Read the cluster configuration file, format is specified at
+ // http://doc.mapr.com/display/MapR/mapr-clusters.conf
+
+ try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) {
+
+ String line;
+ while ((line = br.readLine()) != null) {
+
+ // Normalize the string
+ line = line.trim();
+ line = line.replace('\t', ' ');
+
+ final String[] fields = line.split(" ");
+ if (fields.length < 1) {
+ continue;
+ }
+
+ final String clusterName = fields[0];
+
+ if (!clusterName.equals(authority)) {
+ continue;
+ }
+
+ final List<String> cldbLocations = new ArrayList<>();
+
+ for (int i = 1; i < fields.length; ++i) {
+
+ // Make sure this is not a key-value pair MapR recently
+ // introduced in the file format along with their security
+ // features.
+ if (!fields[i].isEmpty() && !fields[i].contains("=")) {
+ cldbLocations.add(fields[i]);
+ }
+ }
+
+ if (cldbLocations.isEmpty()) {
+ throw new IOException(
+ String.format(
+ "%s contains entry for cluster %s but no CLDB locations.",
+ maprClusterConf, authority));
+ }
+
+ return cldbLocations.toArray(new String[cldbLocations.size()]);
+ }
+
+ }
+
+ throw new IOException(String.format(
+ "Unable to find CLDB locations for cluster %s", authority));
+ }
+
+ // ------------------------------------------------------------------------
+ // Reflective FS Instantiation
+ // ------------------------------------------------------------------------
+
+ private static void checkMaprFsClassInClassPath() throws IOException {
+ try {
+ Class.forName(MAPR_FS_CLASS_NAME, false, MapRFsFactory.class.getClassLoader());
+ }
+ catch (ClassNotFoundException e) {
+ throw new IOException("Cannot find MapR FS in classpath: " + MAPR_FS_CLASS_NAME, e);
+ }
+ }
+
+ @VisibleForTesting
+ static org.apache.hadoop.fs.FileSystem instantiateMapRFsClass(Object... args) throws IOException {
+ final Class<? extends org.apache.hadoop.fs.FileSystem> fsClazz;
+
+ try {
+ fsClazz = Class
+ .forName(MAPR_FS_CLASS_NAME)
+ .asSubclass(org.apache.hadoop.fs.FileSystem.class);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot load MapR FS. Class missing in classpath", e);
+ } catch (ClassCastException e) {
+ throw new IOException("Class '" + MAPR_FS_CLASS_NAME + "' is not a subclass of org.apache.hadoop.fs.FileSystem");
+ }
+
+ final Class<?>[] constructorArgs = Arrays.stream(args).map(Object::getClass).toArray(Class[]::new);
+ try {
+ final Constructor<? extends org.apache.hadoop.fs.FileSystem> ctor =
+ fsClazz.getConstructor(constructorArgs);
+
+ return ctor.newInstance(args);
+ } catch (Exception e) {
+ throw new IOException("Cannot instantiate MapR FS class", e);
+ }
+ }
}
diff --git a/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java b/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java
new file mode 100644
index 0000000..b027487
--- /dev/null
+++ b/flink-filesystems/flink-mapr-fs/src/test/java/com/mapr/fs/MapRFileSystem.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mapr.fs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Test class that mocks the MapRFileSystem.
+ */
+public class MapRFileSystem extends org.apache.hadoop.fs.FileSystem {
+
+ @Override
+ public URI getUri() {
+ return URI.create("maprfs:/");
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int i) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l, Progressable progressable) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean rename(Path path, Path path1) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean delete(Path path, boolean b) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/tools/travis/nightly.sh b/tools/travis/nightly.sh
index 1e47633..433b925 100755
--- a/tools/travis/nightly.sh
+++ b/tools/travis/nightly.sh
@@ -37,8 +37,7 @@ mkdir -p $ARTIFACTS_DIR || { echo "FAILURE: cannot create log directory '${ARTIF
LOG4J_PROPERTIES=${HERE}/../log4j-travis.properties
MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-# We use -Punsafe-mapr-repo since the https version fails on Travis for some reason.
-MVN_COMMON_OPTIONS="-nsu -B -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -Pskip-webui-build -Punsafe-mapr-repo"
+MVN_COMMON_OPTIONS="-nsu -B -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -Pskip-webui-build"
MVN_COMPILE_OPTIONS="-T1C -DskipTests"
cp tools/travis/splits/* flink-end-to-end-tests
diff --git a/tools/travis_controller.sh b/tools/travis_controller.sh
index 256235b..e7cf94e 100755
--- a/tools/travis_controller.sh
+++ b/tools/travis_controller.sh
@@ -90,8 +90,7 @@ EXIT_CODE=0
# Run actual compile&test steps
if [ $STAGE == "$STAGE_COMPILE" ]; then
- # We use -Punsafe-mapr-repo since the https version fails on Travis for some reason.
- MVN="mvn clean install -nsu -Punsafe-mapr-repo -Dflink.convergence.phase=install -Pcheck-convergence -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests $PROFILE"
+ MVN="mvn clean install -nsu -Dflink.convergence.phase=install -Pcheck-convergence -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests $PROFILE"
$MVN
EXIT_CODE=$?
diff --git a/tools/travis_watchdog.sh b/tools/travis_watchdog.sh
index b9b2b7d..728bf3b 100755
--- a/tools/travis_watchdog.sh
+++ b/tools/travis_watchdog.sh
@@ -60,9 +60,8 @@ MVN_TEST_MODULES=$(get_test_modules_for_stage ${TEST})
# -nsu option forbids downloading snapshot artifacts. The only snapshot artifacts we depend are from
# Flink, which however should all be built locally. see FLINK-7230
#
-# We use -Punsafe-mapr-repo since the https version fails on Travis for some reason.
MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -B -Pskip-webui-build -Punsafe-mapr-repo $MVN_LOGGING_OPTIONS"
+MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -B -Pskip-webui-build $MVN_LOGGING_OPTIONS"
MVN_COMPILE_OPTIONS="-DskipTests"
MVN_TEST_OPTIONS="$MVN_LOGGING_OPTIONS -Dflink.tests.with-openssl"