You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/17 15:48:52 UTC

[2/2] flink git commit: [FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore

[FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f29f8057
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f29f8057
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f29f8057

Branch: refs/heads/master
Commit: f29f80575dac1c7e59dd7074118953b8be26520f
Parents: 3edbb7b
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 25 17:19:25 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Nov 17 16:48:29 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    |   5 +
 .../apache/flink/core/fs/FileSystemKind.java    |  40 ++++++++
 .../core/fs/SafetyNetWrapperFileSystem.java     |   5 +
 .../flink/core/fs/local/LocalFileSystem.java    |  10 +-
 .../core/fs/local/LocalFileSystemTest.java      |   7 ++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  47 +++++++++
 .../flink/runtime/fs/maprfs/MapRFileSystem.java |   6 ++
 .../flink/runtime/fs/hdfs/HdfsKindTest.java     | 101 +++++++++++++++++++
 8 files changed, 219 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d66a893..982e496 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -633,6 +633,11 @@ public abstract class FileSystem {
 	 */
 	public abstract boolean isDistributedFS();
 
+	/**
+	 * Gets a description of the characteristics of this file system.
+	 */
+	public abstract FileSystemKind getKind();
+
 	// ------------------------------------------------------------------------
 	//  output directory initialization
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
new file mode 100644
index 0000000..52f58ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enumeration defining the kind and characteristics of a {@link FileSystem}.
+ */
+@PublicEvolving
+public enum FileSystemKind {
+
+	/**
+	 * An actual file system, with files and directories.
+	 */
+	FILE_SYSTEM,
+
+	/**
+	 * An Object store. Files correspond to objects.
+	 * There are not really directories, but a directory-like structure may be mimicked
+	 * by hierarchical naming of files.
+	 */
+	OBJECT_STORE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index a1167dd..e7f43a4 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -141,6 +141,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
 	}
 
 	@Override
+	public FileSystemKind getKind() {
+		return unsafeFileSystem.getKind();
+	}
+
+	@Override
 	public FileSystem getWrappedDelegate() {
 		return unsafeFileSystem;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index ecfd21c..a96f221 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.OperatingSystem;
 
@@ -283,13 +284,18 @@ public class LocalFileSystem extends FileSystem {
 		return false;
 	}
 
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.FILE_SYSTEM;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Gets the URI that represents the local file system.
 	 * That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
 	 * UNIX family platforms.
-	 * 
+	 *
 	 * @return The URI that represents the local file system.
 	 */
 	public static URI getLocalFsURI() {
@@ -298,7 +304,7 @@ public class LocalFileSystem extends FileSystem {
 
 	/**
 	 * Gets the shared instance of this file system.
-	 * 
+	 *
 	 * @return The shared instance of this file system.
 	 */
 	public static LocalFileSystem getSharedInstance() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index 2312ee9..96c5269 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.util.FileUtils;
@@ -312,4 +313,10 @@ public class LocalFileSystemTest {
 		assertTrue(fs.rename(new Path(srcFolder.toURI()), new Path(dstFolder.toURI())));
 		assertTrue(new File(dstFolder, srcFile.getName()).exists());
 	}
+
+	@Test
+	public void testKind() {
+		final FileSystem fs = FileSystem.getLocalFileSystem();
+		assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 5970c9d..7bc5a0f 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -21,10 +21,12 @@ package org.apache.flink.runtime.fs.hdfs;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Locale;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -36,6 +38,11 @@ public class HadoopFileSystem extends FileSystem {
 	/** The wrapped Hadoop File System. */
 	private final org.apache.hadoop.fs.FileSystem fs;
 
+	/* This field caches the file system kind. It is lazily set because the file system
+	* URL is lazily initialized. */
+	private FileSystemKind fsKind;
+
+
 	/**
 	 * Wraps the given Hadoop File System object as a Flink File System object.
 	 * The given Hadoop file system object is expected to be initialized already.
@@ -168,4 +175,44 @@ public class HadoopFileSystem extends FileSystem {
 	public boolean isDistributedFS() {
 		return true;
 	}
+
+	@Override
+	public FileSystemKind getKind() {
+		if (fsKind == null) {
+			fsKind = getKindForScheme(this.fs.getUri().getScheme());
+		}
+		return fsKind;
+	}
+
+	/**
+	 * Gets the kind of the file system from its scheme.
+	 *
+	 * <p>Implementation note: Initially, especially within the Flink 1.3.x line
+	 * (in order to not break backwards compatibility), we must only label file systems
+	 * as 'inconsistent' or as 'not proper filesystems' if we are sure about it.
+	 * Otherwise, we cause regression for example in the performance and cleanup handling
+	 * of checkpoints.
+	 * For that reason, we initially mark some filesystems as 'eventually consistent' or
+	 * as 'object stores', and leave the others as 'consistent file systems'.
+	 */
+	static FileSystemKind getKindForScheme(String scheme) {
+		scheme = scheme.toLowerCase(Locale.US);
+
+		if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
+			// the Amazon S3 storage
+			return FileSystemKind.OBJECT_STORE;
+		}
+		else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
+			// file servers instead of file systems
+			// they might actually be consistent, but we have no hard guarantees
+			// currently to rely on that
+			return FileSystemKind.OBJECT_STORE;
+		}
+		else {
+			// the remainder should include hdfs, kosmos, ceph, ...
+			// this also includes federated HDFS (viewfs).
+			return FileSystemKind.FILE_SYSTEM;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index 058772c..5aec4a4 100644
--- a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.fs.maprfs;
 
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import org.slf4j.Logger;
@@ -172,4 +173,9 @@ public class MapRFileSystem extends HadoopFileSystem {
 		throw new IOException(String.format(
 				"Unable to find CLDB locations for cluster %s", authority));
 	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.FILE_SYSTEM;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
new file mode 100644
index 0000000..69ecdb8
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for extracting the {@link FileSystemKind} from file systems that Flink
+ * accesses through Hadoop's File System interface.
+ *
+ * <p>This class needs to be in this package, because it accesses package private methods
+ * from the HDFS file system wrapper class.
+ */
+public class HdfsKindTest extends TestLogger {
+
+	@Test
+	public void testHdfsKind() throws IOException {
+		final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem();
+		assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+	}
+
+	@Test
+	public void testS3Kind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path");
+			return;
+		}
+
+		final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind());
+	}
+
+	@Test
+	public void testS3nKind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path");
+			return;
+		}
+
+		final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind());
+	}
+
+	@Test
+	public void testS3aKind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path");
+			return;
+		}
+
+		final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind());
+	}
+
+	@Test
+	public void testS3fileSystemSchemes() {
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3n"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3a"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("EMRFS"));
+	}
+
+	@Test
+	public void testViewFs() {
+		assertEquals(FileSystemKind.FILE_SYSTEM, HadoopFileSystem.getKindForScheme("viewfs"));
+	}
+}