You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/28 13:34:27 UTC
[2/3] flink git commit: [FLINK-7265] [core] Introduce FileSystemKind
to differentiate between FileSystem and ObjectStore
[FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/854b0537
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/854b0537
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/854b0537
Branch: refs/heads/release-1.3
Commit: 854b05376a459a6197e41e141bb28a9befe481ad
Parents: 3569f80
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 25 17:19:25 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jul 28 15:15:30 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/core/fs/FileSystem.java | 5 +
.../apache/flink/core/fs/FileSystemKind.java | 40 ++++++++
.../core/fs/SafetyNetWrapperFileSystem.java | 5 +
.../flink/core/fs/local/LocalFileSystem.java | 6 ++
.../core/fs/local/LocalFileSystemTest.java | 7 ++
.../flink/runtime/fs/hdfs/HdfsKindTest.java | 101 +++++++++++++++++++
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 47 +++++++++
.../flink/runtime/fs/maprfs/MapRFileSystem.java | 7 +-
8 files changed, 217 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index fab0f4d..e76992d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -659,6 +659,11 @@ public abstract class FileSystem {
*/
public abstract boolean isDistributedFS();
+ /**
+ * Gets a description of the characteristics of this file system.
+ */
+ public abstract FileSystemKind getKind();
+
// ------------------------------------------------------------------------
// output directory initialization
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
new file mode 100644
index 0000000..52f58ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enumeration defining the kind and characteristics of a {@link FileSystem}.
+ */
+@PublicEvolving
+public enum FileSystemKind {
+
+ /**
+ * An actual file system, with files and directories.
+ */
+ FILE_SYSTEM,
+
+ /**
+ * An Object store. Files correspond to objects.
+ * There are not really directories, but a directory-like structure may be mimicked
+ * by hierarchical naming of files.
+ */
+ OBJECT_STORE
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index 1dacafd..63a263a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -146,6 +146,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
}
@Override
+ public FileSystemKind getKind() {
+ return unsafeFileSystem.getKind();
+ }
+
+ @Override
public FileSystem getWrappedDelegate() {
return unsafeFileSystem;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 0e3e9f3..579f856 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.OperatingSystem;
@@ -289,4 +290,9 @@ public class LocalFileSystem extends FileSystem {
public boolean isDistributedFS() {
return false;
}
+
+ @Override
+ public FileSystemKind getKind() {
+ return FileSystemKind.FILE_SYSTEM;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index 2312ee9..96c5269 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.FileUtils;
@@ -312,4 +313,10 @@ public class LocalFileSystemTest {
assertTrue(fs.rename(new Path(srcFolder.toURI()), new Path(dstFolder.toURI())));
assertTrue(new File(dstFolder, srcFile.getName()).exists());
}
+
+ @Test
+ public void testKind() {
+ final FileSystem fs = FileSystem.getLocalFileSystem();
+ assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
new file mode 100644
index 0000000..69ecdb8
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for extracting the {@link FileSystemKind} from file systems that Flink
+ * accesses through Hadoop's File System interface.
+ *
+ * <p>This class needs to be in this package, because it accesses package private methods
+ * from the HDFS file system wrapper class.
+ */
+public class HdfsKindTest extends TestLogger {
+
+ @Test
+ public void testHdfsKind() throws IOException {
+ final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem();
+ assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+ }
+
+ @Test
+ public void testS3Kind() throws IOException {
+ try {
+ Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
+ } catch (ClassNotFoundException ignored) {
+ // not in the classpath, cannot run this test
+ log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path");
+ return;
+ }
+
+ final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+ assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind());
+ }
+
+ @Test
+ public void testS3nKind() throws IOException {
+ try {
+ Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+ } catch (ClassNotFoundException ignored) {
+ // not in the classpath, cannot run this test
+ log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path");
+ return;
+ }
+
+ final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+ assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind());
+ }
+
+ @Test
+ public void testS3aKind() throws IOException {
+ try {
+ Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
+ } catch (ClassNotFoundException ignored) {
+ // not in the classpath, cannot run this test
+ log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path");
+ return;
+ }
+
+ final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+ assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind());
+ }
+
+ @Test
+ public void testS3fileSystemSchemes() {
+ assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3"));
+ assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3n"));
+ assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3a"));
+ assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("EMRFS"));
+ }
+
+ @Test
+ public void testViewFs() {
+ assertEquals(FileSystemKind.FILE_SYSTEM, HadoopFileSystem.getKindForScheme("viewfs"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index d539b2a..b4f4609 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -22,10 +22,12 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.HadoopFileSystemWrapper;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.conf.Configuration;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +36,7 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.UnknownHostException;
+import java.util.Locale;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -60,6 +63,10 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
private final org.apache.hadoop.fs.FileSystem fs;
+ /* This field caches the file system kind. It is lazily set because the file system
+ * URL is lazily initialized. */
+ private FileSystemKind fsKind;
+
/**
* Creates a new DistributedFileSystem object to access HDFS, based on a class name
@@ -464,6 +471,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
}
@Override
+ public FileSystemKind getKind() {
+ if (fsKind == null) {
+ fsKind = getKindForScheme(this.fs.getUri().getScheme());
+ }
+ return fsKind;
+ }
+
+ @Override
public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
Configuration hadoopConf = getHadoopConfiguration();
Class<? extends org.apache.hadoop.fs.FileSystem> clazz = null;
@@ -478,4 +493,36 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
}
return clazz;
}
+
+ /**
+ * Gets the kind of the file system from its scheme.
+ *
+ * <p>Implementation note: Initially, especially within the Flink 1.3.x line
+ * (in order to not break backwards compatibility), we must only label file systems
+ * as 'inconsistent' or as 'not proper filesystems' if we are sure about it.
+ * Otherwise, we cause regression for example in the performance and cleanup handling
+ * of checkpoints.
+ * For that reason, we initially mark some filesystems as 'eventually consistent' or
+ * as 'object stores', and leave the others as 'consistent file systems'.
+ */
+ static FileSystemKind getKindForScheme(String scheme) {
+ scheme = scheme.toLowerCase(Locale.US);
+
+ if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
+ // the Amazon S3 storage
+ return FileSystemKind.OBJECT_STORE;
+ }
+ else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
+ // file servers instead of file systems
+ // they might actually be consistent, but we have no hard guarantees
+ // currently to rely on that
+ return FileSystemKind.OBJECT_STORE;
+ }
+ else {
+ // the remainder should include hdfs, kosmos, ceph, ...
+ // this also includes federated HDFS (viewfs).
+ return FileSystemKind.FILE_SYSTEM;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index 57eea6f..1ec34bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
@@ -381,7 +382,11 @@ public final class MapRFileSystem extends FileSystem {
@Override
public boolean isDistributedFS() {
-
return true;
}
+
+ @Override
+ public FileSystemKind getKind() {
+ return FileSystemKind.FILE_SYSTEM;
+ }
}