You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/28 13:34:27 UTC

[2/3] flink git commit: [FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore

[FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/854b0537
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/854b0537
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/854b0537

Branch: refs/heads/release-1.3
Commit: 854b05376a459a6197e41e141bb28a9befe481ad
Parents: 3569f80
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 25 17:19:25 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jul 28 15:15:30 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    |   5 +
 .../apache/flink/core/fs/FileSystemKind.java    |  40 ++++++++
 .../core/fs/SafetyNetWrapperFileSystem.java     |   5 +
 .../flink/core/fs/local/LocalFileSystem.java    |   6 ++
 .../core/fs/local/LocalFileSystemTest.java      |   7 ++
 .../flink/runtime/fs/hdfs/HdfsKindTest.java     | 101 +++++++++++++++++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  47 +++++++++
 .../flink/runtime/fs/maprfs/MapRFileSystem.java |   7 +-
 8 files changed, 217 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index fab0f4d..e76992d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -659,6 +659,11 @@ public abstract class FileSystem {
 	 */
 	public abstract boolean isDistributedFS();
 
+	/**
+	 * Gets a description of the characteristics of this file system.
+	 */
+	public abstract FileSystemKind getKind();
+
 	// ------------------------------------------------------------------------
 	//  output directory initialization
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
new file mode 100644
index 0000000..52f58ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enumeration defining the kind and characteristics of a {@link FileSystem}.
+ */
+@PublicEvolving
+public enum FileSystemKind {
+
+	/**
+	 * An actual file system, with files and directories.
+	 */
+	FILE_SYSTEM,
+
+	/**
+	 * An Object store. Files correspond to objects.
+	 * There are not really directories, but a directory-like structure may be mimicked
+	 * by hierarchical naming of files.
+	 */
+	OBJECT_STORE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index 1dacafd..63a263a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -146,6 +146,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
 	}
 
 	@Override
+	public FileSystemKind getKind() {
+		return unsafeFileSystem.getKind();
+	}
+
+	@Override
 	public FileSystem getWrappedDelegate() {
 		return unsafeFileSystem;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 0e3e9f3..579f856 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.OperatingSystem;
 
@@ -289,4 +290,9 @@ public class LocalFileSystem extends FileSystem {
 	public boolean isDistributedFS() {
 		return false;
 	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.FILE_SYSTEM;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index 2312ee9..96c5269 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.util.FileUtils;
@@ -312,4 +313,10 @@ public class LocalFileSystemTest {
 		assertTrue(fs.rename(new Path(srcFolder.toURI()), new Path(dstFolder.toURI())));
 		assertTrue(new File(dstFolder, srcFile.getName()).exists());
 	}
+
+	@Test
+	public void testKind() {
+		final FileSystem fs = FileSystem.getLocalFileSystem();
+		assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
new file mode 100644
index 0000000..69ecdb8
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for extracting the {@link FileSystemKind} from file systems that Flink
+ * accesses through Hadoop's File System interface.
+ *
+ * <p>This class needs to be in this package, because it accesses package private methods
+ * from the HDFS file system wrapper class.
+ */
+public class HdfsKindTest extends TestLogger {
+
+	@Test
+	public void testHdfsKind() throws IOException {
+		final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem();
+		assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+	}
+
+	@Test
+	public void testS3Kind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path");
+			return;
+		}
+
+		final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind());
+	}
+
+	@Test
+	public void testS3nKind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path");
+			return;
+		}
+
+		final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind());
+	}
+
+	@Test
+	public void testS3aKind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path");
+			return;
+		}
+
+		final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind());
+	}
+
+	@Test
+	public void testS3fileSystemSchemes() {
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3n"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3a"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("EMRFS"));
+	}
+
+	@Test
+	public void testViewFs() {
+		assertEquals(FileSystemKind.FILE_SYSTEM, HadoopFileSystem.getKindForScheme("viewfs"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index d539b2a..b4f4609 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -22,10 +22,12 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.HadoopFileSystemWrapper;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.hadoop.conf.Configuration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +36,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.util.Locale;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -60,6 +63,10 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 
 	private final org.apache.hadoop.fs.FileSystem fs;
 
+	/* This field caches the file system kind. It is lazily set because the file system
+	* URL is lazily initialized. */
+	private FileSystemKind fsKind;
+
 
 	/**
 	 * Creates a new DistributedFileSystem object to access HDFS, based on a class name
@@ -464,6 +471,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 	}
 
 	@Override
+	public FileSystemKind getKind() {
+		if (fsKind == null) {
+			fsKind = getKindForScheme(this.fs.getUri().getScheme());
+		}
+		return fsKind;
+	}
+
+	@Override
 	public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
 		Configuration hadoopConf = getHadoopConfiguration();
 		Class<? extends org.apache.hadoop.fs.FileSystem> clazz = null;
@@ -478,4 +493,36 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		}
 		return clazz;
 	}
+
+	/**
+	 * Gets the kind of the file system from its scheme.
+	 *
+	 * <p>Implementation note: Initially, especially within the Flink 1.3.x line
+	 * (in order to not break backwards compatibility), we must only label file systems
+	 * as 'inconsistent' or as 'not proper filesystems' if we are sure about it.
+	 * Otherwise, we cause regression for example in the performance and cleanup handling
+	 * of checkpoints.
+	 * For that reason, we initially mark some filesystems as 'eventually consistent' or
+	 * as 'object stores', and leave the others as 'consistent file systems'.
+	 */
+	static FileSystemKind getKindForScheme(String scheme) {
+		scheme = scheme.toLowerCase(Locale.US);
+
+		if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
+			// the Amazon S3 storage
+			return FileSystemKind.OBJECT_STORE;
+		}
+		else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
+			// file servers instead of file systems
+			// they might actually be consistent, but we have no hard guarantees
+			// currently to rely on that
+			return FileSystemKind.OBJECT_STORE;
+		}
+		else {
+			// the remainder should include hdfs, kosmos, ceph, ...
+			// this also includes federated HDFS (viewfs).
+			return FileSystemKind.FILE_SYSTEM;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index 57eea6f..1ec34bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
 import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
@@ -381,7 +382,11 @@ public final class MapRFileSystem extends FileSystem {
 
 	@Override
 	public boolean isDistributedFS() {
-
 		return true;
 	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.FILE_SYSTEM;
+	}
 }