You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/06 10:05:52 UTC

[3/4] flink git commit: [FLINK-7643] [core] Rework FileSystem loading to use factories

http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index 275e492..9e12f96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -37,16 +37,20 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Concrete implementation of the {@link FileSystem} base class for the MapR
  * file system. The class contains MapR specific code to initialize the
  * connection to the file system. Apart from that, we code mainly reuses the
  * existing HDFS wrapper code.
  */
+@SuppressWarnings("unused") // is only instantiated via reflection
 public final class MapRFileSystem extends FileSystem {
 
 	/**
@@ -77,21 +81,12 @@ public final class MapRFileSystem extends FileSystem {
 	 */
 	private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf";
 
-	/**
-	 * A Hadoop configuration object used during the file system initialization.
-	 */
-	private final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-
-	/**
-	 * The MapR class containing the implementation of the Hadoop HDFS
-	 * interface.
-	 */
-	private final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
+	// ------------------------------------------------------------------------
 
 	/**
 	 * The MapR implementation of the Hadoop HDFS interface.
 	 */
-	private org.apache.hadoop.fs.FileSystem fs;
+	private final org.apache.hadoop.fs.FileSystem fs;
 
 	/**
 	 * Creates a new MapRFileSystem object to access the MapR file system.
@@ -99,59 +94,36 @@ public final class MapRFileSystem extends FileSystem {
 	 * @throws IOException
 	 *             throw if the required MapR classes cannot be found
 	 */
-	@SuppressWarnings("unchecked")
-	public MapRFileSystem() throws IOException {
+	public MapRFileSystem(URI fsURI) throws IOException {
+		checkNotNull(fsURI, "fsURI");
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format(
-					"Trying to load class %s to access the MapR file system",
-					MAPR_FS_IMPL_CLASS));
-		}
+		LOG.debug("Trying to load class {} to access the MapR file system", MAPR_FS_IMPL_CLASS);
 
+		final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
 		try {
-			this.fsClass = (Class<? extends org.apache.hadoop.fs.FileSystem>) Class
-					.forName(MAPR_FS_IMPL_CLASS);
-		} catch (Exception e) {
+			fsClass = Class.forName(MAPR_FS_IMPL_CLASS).asSubclass(org.apache.hadoop.fs.FileSystem.class);
+		}
+		catch (Exception e) {
 			throw new IOException(
-					String.format(
-							"Cannot find class %s, probably the runtime was not compiled against the MapR Hadoop libraries",
+					String.format("Cannot load MapR File System class '%s'. " +
+							"Please check that the MapR Hadoop libraries are in the classpath.",
 							MAPR_FS_IMPL_CLASS), e);
 		}
-	}
-
-	@Override
-	public Path getWorkingDirectory() {
-
-		return new Path(this.fs.getWorkingDirectory().toUri());
-	}
-
-	public Path getHomeDirectory() {
-		return new Path(this.fs.getHomeDirectory().toUri());
-	}
-
-	@Override
-	public URI getUri() {
 
-		return this.fs.getUri();
-	}
+		LOG.info("Initializing MapR file system for URI {}", fsURI);
 
-	@Override
-	public void initialize(final URI path) throws IOException {
+		final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+		final org.apache.hadoop.fs.FileSystem fs;
 
-		if (LOG.isInfoEnabled()) {
-			LOG.info(String.format("Initializing MapR file system for path %s",
-					path.toString()));
-		}
-
-		final String authority = path.getAuthority();
+		final String authority = fsURI.getAuthority();
 		if (authority == null || authority.isEmpty()) {
 
-			// Use the default constructor to instantiate MapR file system
-			// object
+			// Use the default constructor to instantiate MapR file system object
 
 			try {
-				this.fs = this.fsClass.newInstance();
-			} catch (Exception e) {
+				fs = fsClass.newInstance();
+			}
+			catch (Exception e) {
 				throw new IOException(e);
 			}
 		} else {
@@ -161,110 +133,47 @@ public final class MapRFileSystem extends FileSystem {
 			final String[] cldbLocations = getCLDBLocations(authority);
 
 			// Find the appropriate constructor
-			final Constructor<? extends org.apache.hadoop.fs.FileSystem> constructor;
 			try {
-				constructor = this.fsClass.getConstructor(String.class,
-						String[].class);
-			} catch (NoSuchMethodException e) {
-				throw new IOException(e);
-			}
+				final Constructor<? extends org.apache.hadoop.fs.FileSystem> constructor =
+						fsClass.getConstructor(String.class, String[].class);
 
-			// Instantiate the file system object
-			try {
-				this.fs = constructor.newInstance(authority, cldbLocations);
-			} catch (Exception e) {
+				fs = constructor.newInstance(authority, cldbLocations);
+			}
+			catch (InvocationTargetException e) {
+				if (e.getTargetException() instanceof IOException) {
+					throw (IOException) e.getTargetException();
+				} else {
+					throw new IOException(e.getTargetException());
+				}
+			}
+			catch (Exception e) {
 				throw new IOException(e);
 			}
 		}
 
-		this.fs.initialize(path, this.conf);
-	}
-
-	/**
-	 * Retrieves the CLDB locations for the given MapR cluster name.
-	 *
-	 * @param authority
-	 *            the name of the MapR cluster
-	 * @return a list of CLDB locations
-	 * @throws IOException
-	 *             thrown if the CLDB locations for the given MapR cluster name
-	 *             cannot be determined
-	 */
-	private static String[] getCLDBLocations(final String authority)
-			throws IOException {
+		// now initialize the Hadoop File System object
+		fs.initialize(fsURI, conf);
 
-		// Determine the MapR home
-		String maprHome = System.getenv(MAPR_HOME_ENV);
-		if (maprHome == null) {
-			maprHome = DEFAULT_MAPR_HOME;
-		}
-
-		final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format(
-					"Trying to retrieve MapR cluster configuration from %s",
-					maprClusterConf));
-		}
-
-		// Read the cluster configuration file, format is specified at
-		// http://doc.mapr.com/display/MapR/mapr-clusters.conf
-		BufferedReader br = null;
-		try {
-			br = new BufferedReader(new FileReader(maprClusterConf));
-
-			String line;
-			while ((line = br.readLine()) != null) {
-
-				// Normalize the string
-				line = line.trim();
-				line = line.replace('\t', ' ');
-
-				final String[] fields = line.split(" ");
-				if (fields == null) {
-					continue;
-				}
-
-				if (fields.length < 1) {
-					continue;
-				}
-
-				final String clusterName = fields[0];
-
-				if (!clusterName.equals(authority)) {
-					continue;
-				}
-
-				final List<String> cldbLocations = new ArrayList<String>();
-
-				for (int i = 1; i < fields.length; ++i) {
-
-					// Make sure this is not a key-value pair MapR recently
-					// introduced in the file format along with their security
-					// features.
-					if (!fields[i].isEmpty() && !fields[i].contains("=")) {
-						cldbLocations.add(fields[i]);
-					}
-				}
+		// all good as it seems
+		this.fs = fs;
+	}
 
-				if (cldbLocations.isEmpty()) {
-					throw new IOException(
-							String.format(
-									"%s contains entry for cluster %s but no CLDB locations.",
-									maprClusterConf, authority));
-				}
+	// ------------------------------------------------------------------------
+	//  file system methods
+	// ------------------------------------------------------------------------
 
-				return cldbLocations.toArray(new String[0]);
-			}
+	@Override
+	public Path getWorkingDirectory() {
+		return new Path(this.fs.getWorkingDirectory().toUri());
+	}
 
-		} finally {
-			if (br != null) {
-				br.close();
-			}
-		}
+	public Path getHomeDirectory() {
+		return new Path(this.fs.getHomeDirectory().toUri());
+	}
 
-		throw new IOException(String.format(
-				"Unable to find CLDB locations for cluster %s", authority));
+	@Override
+	public URI getUri() {
+		return this.fs.getUri();
 	}
 
 	@Override
@@ -315,6 +224,7 @@ public final class MapRFileSystem extends FileSystem {
 		return new HadoopDataInputStream(fdis);
 	}
 
+	@SuppressWarnings("deprecation")
 	@Override
 	public FSDataOutputStream create(final Path f, final boolean overwrite,
 			final int bufferSize, final short replication, final long blockSize)
@@ -376,13 +286,92 @@ public final class MapRFileSystem extends FileSystem {
 	@SuppressWarnings("deprecation")
 	@Override
 	public long getDefaultBlockSize() {
-
 		return this.fs.getDefaultBlockSize();
 	}
 
 	@Override
 	public boolean isDistributedFS() {
-
 		return true;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Retrieves the CLDB locations for the given MapR cluster name.
+	 *
+	 * @param authority
+	 *            the name of the MapR cluster
+	 * @return a list of CLDB locations
+	 * @throws IOException
+	 *             thrown if the CLDB locations for the given MapR cluster name
+	 *             cannot be determined
+	 */
+	private static String[] getCLDBLocations(final String authority) throws IOException {
+
+		// Determine the MapR home
+		String maprHome = System.getenv(MAPR_HOME_ENV);
+		if (maprHome == null) {
+			maprHome = DEFAULT_MAPR_HOME;
+		}
+
+		final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(String.format(
+					"Trying to retrieve MapR cluster configuration from %s",
+					maprClusterConf));
+		}
+
+		// Read the cluster configuration file, format is specified at
+		// http://doc.mapr.com/display/MapR/mapr-clusters.conf
+
+		try (BufferedReader br = new BufferedReader(new FileReader(maprClusterConf))) {
+
+			String line;
+			while ((line = br.readLine()) != null) {
+
+				// Normalize the string
+				line = line.trim();
+				line = line.replace('\t', ' ');
+
+				final String[] fields = line.split(" ");
+				if (fields.length < 1) {
+					continue;
+				}
+
+				final String clusterName = fields[0];
+
+				if (!clusterName.equals(authority)) {
+					continue;
+				}
+
+				final List<String> cldbLocations = new ArrayList<>();
+
+				for (int i = 1; i < fields.length; ++i) {
+
+					// Make sure this is not a key-value pair MapR recently
+					// introduced in the file format along with their security
+					// features.
+					if (!fields[i].isEmpty() && !fields[i].contains("=")) {
+						cldbLocations.add(fields[i]);
+					}
+				}
+
+				if (cldbLocations.isEmpty()) {
+					throw new IOException(
+							String.format(
+									"%s contains entry for cluster %s but no CLDB locations.",
+									maprClusterConf, authority));
+				}
+
+				return cldbLocations.toArray(new String[cldbLocations.size()]);
+			}
+
+		}
+
+		throw new IOException(String.format(
+				"Unable to find CLDB locations for cluster %s", authority));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 56c79bb..85a6aed 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2386,7 +2386,7 @@ object JobManager {
     val configuration = GlobalConfiguration.loadConfiguration(configDir)
 
     try {
-      FileSystem.setDefaultScheme(configuration)
+      FileSystem.initialize(configuration)
     }
     catch {
       case e: IOException => {

http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8d1f6f7..558388c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1617,7 +1617,7 @@ object TaskManager {
     }
 
     try {
-      FileSystem.setDefaultScheme(conf)
+      FileSystem.initialize(conf)
     }
     catch {
       case e: IOException => {

http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index e73c684..5f2de33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.util.IOUtils;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -38,8 +39,9 @@ import scala.Tuple2;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.lang.reflect.Field;
-import java.net.*;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.URI;
 import java.util.Iterator;
 
 import static org.junit.Assert.*;
@@ -78,10 +80,6 @@ public class TaskManagerConfigurationTest {
 
 			// validate the configured test host name
 			assertEquals(TEST_HOST_NAME, address._1());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
 		} finally {
 			highAvailabilityServices.closeAndCleanupAllData();
 		}
@@ -164,16 +162,11 @@ public class TaskManagerConfigurationTest {
 			String[] args = new String[] {"--configDir:" + tmpDir};
 			TaskManager.parseArgsAndLoadConfig(args);
 
-			Field f = FileSystem.class.getDeclaredField("defaultScheme");
-			f.setAccessible(true);
-			URI scheme = (URI) f.get(null);
-
-			assertEquals("Default Filesystem Scheme not configured.", scheme, defaultFS);
-		} finally {
-			// reset default FS scheme:
-			Field f = FileSystem.class.getDeclaredField("defaultScheme");
-			f.setAccessible(true);
-			f.set(null, null);
+			assertEquals(defaultFS, FileSystem.getDefaultFsUri());
+		}
+		finally {
+			// reset FS settings
+			FileSystem.initialize(new Configuration());
 		}
 	}
 
@@ -205,18 +198,9 @@ public class TaskManagerConfigurationTest {
 		try {
 			assertNotNull(TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._1());
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 		finally {
 			highAvailabilityServices.closeAndCleanupAllData();
-
-			try {
-				server.close();
-			} catch (IOException e) {
-				// ignore shutdown errors
-			}
+			IOUtils.closeQuietly(server);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index de78393..8ecc371 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -611,10 +611,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			YarnClientApplication yarnApplication,
 			ClusterSpecification clusterSpecification) throws Exception {
 
-		// ------------------ Set default file system scheme -------------------------
+		// ------------------ Initialize the file systems -------------------------
 
 		try {
-			org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
+			org.apache.flink.core.fs.FileSystem.initialize(flinkConfiguration);
 		} catch (IOException e) {
 			throw new IOException("Error while setting the default " +
 					"filesystem scheme from configuration.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 25d46fb..cd0053b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -100,7 +100,7 @@ public class YarnTaskExecutorRunner {
 			LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
 
 			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
-			FileSystem.setDefaultScheme(configuration);
+			FileSystem.initialize(configuration);
 
 			// configure local directory
 			String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/536675b0/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
index d94921e..0cf9dc4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -142,7 +142,7 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
 			throw new IOException("Cannot instantiate YARN's Hadoop file system for " + fsUri, e);
 		}
 
-		this.flinkFileSystem = new HadoopFileSystem(hadoopConf, hadoopFileSystem);
+		this.flinkFileSystem = new HadoopFileSystem(hadoopFileSystem);
 
 		this.workingDirectory = new Path(hadoopFileSystem.getWorkingDirectory().toUri());
 		this.haDataDirectory = new Path(workingDirectory, FLINK_RECOVERY_DATA_DIR);