You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/06 10:05:52 UTC
[3/4] flink git commit: [FLINK-7643] [core] Rework FileSystem loading
to use factories
http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index 275e492..9e12f96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -37,16 +37,20 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Concrete implementation of the {@link FileSystem} base class for the MapR
* file system. The class contains MapR specific code to initialize the
* connection to the file system. Apart from that, we code mainly reuses the
* existing HDFS wrapper code.
*/
+@SuppressWarnings("unused") // is only instantiated via reflection
public final class MapRFileSystem extends FileSystem {
/**
@@ -77,21 +81,12 @@ public final class MapRFileSystem extends FileSystem {
*/
private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf";
- /**
- * A Hadoop configuration object used during the file system initialization.
- */
- private final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-
- /**
- * The MapR class containing the implementation of the Hadoop HDFS
- * interface.
- */
- private final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
+ // ------------------------------------------------------------------------
/**
* The MapR implementation of the Hadoop HDFS interface.
*/
- private org.apache.hadoop.fs.FileSystem fs;
+ private final org.apache.hadoop.fs.FileSystem fs;
/**
* Creates a new MapRFileSystem object to access the MapR file system.
@@ -99,59 +94,36 @@ public final class MapRFileSystem extends FileSystem {
* @throws IOException
* throw if the required MapR classes cannot be found
*/
- @SuppressWarnings("unchecked")
- public MapRFileSystem() throws IOException {
+ public MapRFileSystem(URI fsURI) throws IOException {
+ checkNotNull(fsURI, "fsURI");
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Trying to load class %s to access the MapR file system",
- MAPR_FS_IMPL_CLASS));
- }
+ LOG.debug("Trying to load class {} to access the MapR file system", MAPR_FS_IMPL_CLASS);
+ final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
try {
- this.fsClass = (Class<? extends org.apache.hadoop.fs.FileSystem>) Class
- .forName(MAPR_FS_IMPL_CLASS);
- } catch (Exception e) {
+ fsClass = Class.forName(MAPR_FS_IMPL_CLASS).asSubclass(org.apache.hadoop.fs.FileSystem.class);
+ }
+ catch (Exception e) {
throw new IOException(
- String.format(
- "Cannot find class %s, probably the runtime was not compiled against the MapR Hadoop libraries",
+ String.format("Cannot load MapR File System class '%s'. " +
+ "Please check that the MapR Hadoop libraries are in the classpath.",
MAPR_FS_IMPL_CLASS), e);
}
- }
-
- @Override
- public Path getWorkingDirectory() {
-
- return new Path(this.fs.getWorkingDirectory().toUri());
- }
-
- public Path getHomeDirectory() {
- return new Path(this.fs.getHomeDirectory().toUri());
- }
-
- @Override
- public URI getUri() {
- return this.fs.getUri();
- }
+ LOG.info("Initializing MapR file system for URI {}", fsURI);
- @Override
- public void initialize(final URI path) throws IOException {
+ final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+ final org.apache.hadoop.fs.FileSystem fs;
- if (LOG.isInfoEnabled()) {
- LOG.info(String.format("Initializing MapR file system for path %s",
- path.toString()));
- }
-
- final String authority = path.getAuthority();
+ final String authority = fsURI.getAuthority();
if (authority == null || authority.isEmpty()) {
- // Use the default constructor to instantiate MapR file system
- // object
+ // Use the default constructor to instantiate MapR file system object
try {
- this.fs = this.fsClass.newInstance();
- } catch (Exception e) {
+ fs = fsClass.newInstance();
+ }
+ catch (Exception e) {
throw new IOException(e);
}
} else {
@@ -161,110 +133,47 @@ public final class MapRFileSystem extends FileSystem {
final String[] cldbLocations = getCLDBLocations(authority);
// Find the appropriate constructor
- final Constructor<? extends org.apache.hadoop.fs.FileSystem> constructor;
try {
- constructor = this.fsClass.getConstructor(String.class,
- String[].class);
- } catch (NoSuchMethodException e) {
- throw new IOException(e);
- }
+ final Constructor<? extends org.apache.hadoop.fs.FileSystem> constructor =
+ fsClass.getConstructor(String.class, String[].class);
- // Instantiate the file system object
- try {
- this.fs = constructor.newInstance(authority, cldbLocations);
- } catch (Exception e) {
+ fs = constructor.newInstance(authority, cldbLocations);
+ }
+ catch (InvocationTargetException e) {
+ if (e.getTargetException() instanceof IOException) {
+ throw (IOException) e.getTargetException();
+ } else {
+ throw new IOException(e.getTargetException());
+ }
+ }
+ catch (Exception e) {
throw new IOException(e);
}
}
- this.fs.initialize(path, this.conf);
- }
-
- /**
- * Retrieves the CLDB locations for the given MapR cluster name.
- *
- * @param authority
- * the name of the MapR cluster
- * @return a list of CLDB locations
- * @throws IOException
- * thrown if the CLDB locations for the given MapR cluster name
- * cannot be determined
- */
- private static String[] getCLDBLocations(final String authority)
- throws IOException {
+ // now initialize the Hadoop File System object
+ fs.initialize(fsURI, conf);
- // Determine the MapR home
- String maprHome = System.getenv(MAPR_HOME_ENV);
- if (maprHome == null) {
- maprHome = DEFAULT_MAPR_HOME;
- }
-
- final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Trying to retrieve MapR cluster configuration from %s",
- maprClusterConf));
- }
-
- // Read the cluster configuration file, format is specified at
- // http://doc.mapr.com/display/MapR/mapr-clusters.conf
- BufferedReader br = null;
- try {
- br = new BufferedReader(new FileReader(maprClusterConf));
-
- String line;
- while ((line = br.readLine()) != null) {
-
- // Normalize the string
- line = line.trim();
- line = line.replace('\t', ' ');
-
- final String[] fields = line.split(" ");
- if (fields == null) {
- continue;
- }
-
- if (fields.length < 1) {
- continue;
- }
-
- final String clusterName = fields[0];
-
- if (!clusterName.equals(authority)) {
- continue;
- }
-
- final List<String> cldbLocations = new ArrayList<String>();
-
- for (int i = 1; i < fields.length; ++i) {
-
- // Make sure this is not a key-value pair MapR recently
- // introduced in the file format along with their security
- // features.
- if (!fields[i].isEmpty() && !fields[i].contains("=")) {
- cldbLocations.add(fields[i]);
- }
- }
+ // all good as it seems
+ this.fs = fs;
+ }
- if (cldbLocations.isEmpty()) {
- throw new IOException(
- String.format(
- "%s contains entry for cluster %s but no CLDB locations.",
- maprClusterConf, authority));
- }
+ // ------------------------------------------------------------------------
+ // file system methods
+ // ------------------------------------------------------------------------
- return cldbLocations.toArray(new String[0]);
- }
+ @Override
+ public Path getWorkingDirectory() {
+ return new Path(this.fs.getWorkingDirectory().toUri());
+ }
- } finally {
- if (br != null) {
- br.close();
- }
- }
+ public Path getHomeDirectory() {
+ return new Path(this.fs.getHomeDirectory().toUri());
+ }
- throw new IOException(String.format(
- "Unable to find CLDB locations for cluster %s", authority));
+ @Override
+ public URI getUri() {
+ return this.fs.getUri();
}
@Override
@@ -315,6 +224,7 @@ public final class MapRFileSystem extends FileSystem {
return new HadoopDataInputStream(fdis);
}
+ @SuppressWarnings("deprecation")
@Override
public FSDataOutputStream create(final Path f, final boolean overwrite,
final int bufferSize, final short replication, final long blockSize)
@@ -376,13 +286,92 @@ public final class MapRFileSystem extends FileSystem {
@SuppressWarnings("deprecation")
@Override
public long getDefaultBlockSize() {
-
return this.fs.getDefaultBlockSize();
}
@Override
public boolean isDistributedFS() {
-
return true;
}
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Retrieves the CLDB locations for the given MapR cluster name.
+ *
+ * @param authority
+ * the name of the MapR cluster
+ * @return a list of CLDB locations
+ * @throws IOException
+ * thrown if the CLDB locations for the given MapR cluster name
+ * cannot be determined
+ */
+ private static String[] getCLDBLocations(final String authority) throws IOException {
+
+ // Determine the MapR home
+ String maprHome = System.getenv(MAPR_HOME_ENV);
+ if (maprHome == null) {
+ maprHome = DEFAULT_MAPR_HOME;
+ }
+
+ final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Trying to retrieve MapR cluster configuration from %s",
+ maprClusterConf));
+ }
+
+ // Read the cluster configuration file, format is specified at
+ // http://doc.mapr.com/display/MapR/mapr-clusters.conf
+
+ try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) {
+
+ String line;
+ while ((line = br.readLine()) != null) {
+
+ // Normalize the string
+ line = line.trim();
+ line = line.replace('\t', ' ');
+
+ final String[] fields = line.split(" ");
+ if (fields.length < 1) {
+ continue;
+ }
+
+ final String clusterName = fields[0];
+
+ if (!clusterName.equals(authority)) {
+ continue;
+ }
+
+ final List<String> cldbLocations = new ArrayList<>();
+
+ for (int i = 1; i < fields.length; ++i) {
+
+ // Make sure this is not a key-value pair MapR recently
+ // introduced in the file format along with their security
+ // features.
+ if (!fields[i].isEmpty() && !fields[i].contains("=")) {
+ cldbLocations.add(fields[i]);
+ }
+ }
+
+ if (cldbLocations.isEmpty()) {
+ throw new IOException(
+ String.format(
+ "%s contains entry for cluster %s but no CLDB locations.",
+ maprClusterConf, authority));
+ }
+
+ return cldbLocations.toArray(new String[cldbLocations.size()]);
+ }
+
+ }
+
+ throw new IOException(String.format(
+ "Unable to find CLDB locations for cluster %s", authority));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 56c79bb..85a6aed 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2386,7 +2386,7 @@ object JobManager {
val configuration = GlobalConfiguration.loadConfiguration(configDir)
try {
- FileSystem.setDefaultScheme(configuration)
+ FileSystem.initialize(configuration)
}
catch {
case e: IOException => {
http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8d1f6f7..558388c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1617,7 +1617,7 @@ object TaskManager {
}
try {
- FileSystem.setDefaultScheme(conf)
+ FileSystem.initialize(conf)
}
catch {
case e: IOException => {
http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index e73c684..5f2de33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.util.IOUtils;
import org.junit.Rule;
import org.junit.Test;
@@ -38,8 +39,9 @@ import scala.Tuple2;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
-import java.lang.reflect.Field;
-import java.net.*;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.URI;
import java.util.Iterator;
import static org.junit.Assert.*;
@@ -78,10 +80,6 @@ public class TaskManagerConfigurationTest {
// validate the configured test host name
assertEquals(TEST_HOST_NAME, address._1());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
} finally {
highAvailabilityServices.closeAndCleanupAllData();
}
@@ -164,16 +162,11 @@ public class TaskManagerConfigurationTest {
String[] args = new String[] {"--configDir:" + tmpDir};
TaskManager.parseArgsAndLoadConfig(args);
- Field f = FileSystem.class.getDeclaredField("defaultScheme");
- f.setAccessible(true);
- URI scheme = (URI) f.get(null);
-
- assertEquals("Default Filesystem Scheme not configured.", scheme, defaultFS);
- } finally {
- // reset default FS scheme:
- Field f = FileSystem.class.getDeclaredField("defaultScheme");
- f.setAccessible(true);
- f.set(null, null);
+ assertEquals(defaultFS, FileSystem.getDefaultFsUri());
+ }
+ finally {
+ // reset FS settings
+ FileSystem.initialize(new Configuration());
}
}
@@ -205,18 +198,9 @@ public class TaskManagerConfigurationTest {
try {
assertNotNull(TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._1());
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
finally {
highAvailabilityServices.closeAndCleanupAllData();
-
- try {
- server.close();
- } catch (IOException e) {
- // ignore shutdown errors
- }
+ IOUtils.closeQuietly(server);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index de78393..8ecc371 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -611,10 +611,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
- // ------------------ Set default file system scheme -------------------------
+ // ------------------ Initialize the file systems -------------------------
try {
- org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
+ org.apache.flink.core.fs.FileSystem.initialize(flinkConfiguration);
} catch (IOException e) {
throw new IOException("Error while setting the default " +
"filesystem scheme from configuration.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 25d46fb..cd0053b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -100,7 +100,7 @@ public class YarnTaskExecutorRunner {
LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
- FileSystem.setDefaultScheme(configuration);
+ FileSystem.initialize(configuration);
// configure local directory
String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
index d94921e..0cf9dc4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -142,7 +142,7 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
throw new IOException("Cannot instantiate YARN's Hadoop file system for " + fsUri, e);
}
- this.flinkFileSystem = new HadoopFileSystem(hadoopConf, hadoopFileSystem);
+ this.flinkFileSystem = new HadoopFileSystem(hadoopFileSystem);
this.workingDirectory = new Path(hadoopFileSystem.getWorkingDirectory().toUri());
this.haDataDirectory = new Path(workingDirectory, FLINK_RECOVERY_DATA_DIR);