You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/28 13:34:26 UTC

[1/3] flink git commit: [FLINK-6842] [runtime] Uncomment and activate code in HadoopFileSystem

Repository: flink
Updated Branches:
  refs/heads/release-1.3 d0a9fe013 -> 0225db288


[FLINK-6842] [runtime] Uncomment and activate code in HadoopFileSystem

This closes #4219


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3569f804
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3569f804
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3569f804

Branch: refs/heads/release-1.3
Commit: 3569f804a0fde6c1a815147968773d1fea32b7e0
Parents: d0a9fe0
Author: zhangminglei <zm...@163.com>
Authored: Thu Jul 6 23:01:54 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jul 28 15:01:31 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java      | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3569f804/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 1371d21..d539b2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -466,15 +466,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 	@Override
 	public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
 		Configuration hadoopConf = getHadoopConfiguration();
-		Class<? extends org.apache.hadoop.fs.FileSystem> clazz;
-		// We can activate this block once we drop Hadoop1 support (only hd2 has the getFileSystemClass-method)
-//		try {
-//			clazz = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConf);
-//		} catch (IOException e) {
-//			LOG.info("Flink could not load the Hadoop File system implementation for scheme "+scheme);
-//			return null;
-//		}
-		clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class);
+		Class<? extends org.apache.hadoop.fs.FileSystem> clazz = null;
+		try {
+			clazz = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConf);
+		} catch (IOException e) {
+			LOG.info("Flink could not load the Hadoop File system implementation for scheme " + scheme);
+		}
 
 		if(clazz != null && LOG.isDebugEnabled()) {
 			LOG.debug("Flink supports {} with the Hadoop file system wrapper, impl {}", scheme, clazz);


[3/3] flink git commit: [FLINK-7266] [core] Prevent attempt for parent directory deletion for object stores

Posted by se...@apache.org.
[FLINK-7266] [core] Prevent attempt for parent directory deletion for object stores

This closes #4397


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0225db28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0225db28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0225db28

Branch: refs/heads/release-1.3
Commit: 0225db288661846edce25e12d457fb25dfa87827
Parents: 854b053
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 25 17:26:38 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jul 28 15:21:42 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/state/filesystem/FileStateHandle.java   | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0225db28/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index bdf3f42..7655f0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.FileUtils;
@@ -77,14 +78,15 @@ public class FileStateHandle implements StreamStateHandle {
 	 */
 	@Override
 	public void discardState() throws Exception {
-
 		FileSystem fs = getFileSystem();
 
 		fs.delete(filePath, false);
 
-		try {
-			FileUtils.deletePathIfEmpty(fs, filePath.getParent());
-		} catch (Exception ignored) {}
+		if (fs.getKind() == FileSystemKind.FILE_SYSTEM) {
+			try {
+				FileUtils.deletePathIfEmpty(fs, filePath.getParent());
+			} catch (Exception ignored) {}
+		}
 	}
 
 	/**


[2/3] flink git commit: [FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore

Posted by se...@apache.org.
[FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/854b0537
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/854b0537
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/854b0537

Branch: refs/heads/release-1.3
Commit: 854b05376a459a6197e41e141bb28a9befe481ad
Parents: 3569f80
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 25 17:19:25 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jul 28 15:15:30 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    |   5 +
 .../apache/flink/core/fs/FileSystemKind.java    |  40 ++++++++
 .../core/fs/SafetyNetWrapperFileSystem.java     |   5 +
 .../flink/core/fs/local/LocalFileSystem.java    |   6 ++
 .../core/fs/local/LocalFileSystemTest.java      |   7 ++
 .../flink/runtime/fs/hdfs/HdfsKindTest.java     | 101 +++++++++++++++++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  47 +++++++++
 .../flink/runtime/fs/maprfs/MapRFileSystem.java |   7 +-
 8 files changed, 217 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index fab0f4d..e76992d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -659,6 +659,11 @@ public abstract class FileSystem {
 	 */
 	public abstract boolean isDistributedFS();
 
+	/**
+	 * Gets a description of the characteristics of this file system.
+	 */
+	public abstract FileSystemKind getKind();
+
 	// ------------------------------------------------------------------------
 	//  output directory initialization
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
new file mode 100644
index 0000000..52f58ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enumeration defining the kind and characteristics of a {@link FileSystem}.
+ */
+@PublicEvolving
+public enum FileSystemKind {
+
+	/**
+	 * An actual file system, with files and directories.
+	 */
+	FILE_SYSTEM,
+
+	/**
+	 * An Object store. Files correspond to objects.
+	 * There are not really directories, but a directory-like structure may be mimicked
+	 * by hierarchical naming of files.
+	 */
+	OBJECT_STORE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index 1dacafd..63a263a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -146,6 +146,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
 	}
 
 	@Override
+	public FileSystemKind getKind() {
+		return unsafeFileSystem.getKind();
+	}
+
+	@Override
 	public FileSystem getWrappedDelegate() {
 		return unsafeFileSystem;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 0e3e9f3..579f856 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.OperatingSystem;
 
@@ -289,4 +290,9 @@ public class LocalFileSystem extends FileSystem {
 	public boolean isDistributedFS() {
 		return false;
 	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.FILE_SYSTEM;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index 2312ee9..96c5269 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.util.FileUtils;
@@ -312,4 +313,10 @@ public class LocalFileSystemTest {
 		assertTrue(fs.rename(new Path(srcFolder.toURI()), new Path(dstFolder.toURI())));
 		assertTrue(new File(dstFolder, srcFile.getName()).exists());
 	}
+
+	@Test
+	public void testKind() {
+		final FileSystem fs = FileSystem.getLocalFileSystem();
+		assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
new file mode 100644
index 0000000..69ecdb8
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for extracting the {@link FileSystemKind} from file systems that Flink
+ * accesses through Hadoop's File System interface.
+ *
+ * <p>This class needs to be in this package, because it accesses package private methods
+ * from the HDFS file system wrapper class.
+ */
+public class HdfsKindTest extends TestLogger {
+
+	@Test
+	public void testHdfsKind() throws IOException {
+		final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem();
+		assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+	}
+
+	@Test
+	public void testS3Kind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path");
+			return;
+		}
+
+		final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind());
+	}
+
+	@Test
+	public void testS3nKind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path");
+			return;
+		}
+
+		final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind());
+	}
+
+	@Test
+	public void testS3aKind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path");
+			return;
+		}
+
+		final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind());
+	}
+
+	@Test
+	public void testS3fileSystemSchemes() {
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3n"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3a"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("EMRFS"));
+	}
+
+	@Test
+	public void testViewFs() {
+		assertEquals(FileSystemKind.FILE_SYSTEM, HadoopFileSystem.getKindForScheme("viewfs"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index d539b2a..b4f4609 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -22,10 +22,12 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.HadoopFileSystemWrapper;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.hadoop.conf.Configuration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +36,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.util.Locale;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -60,6 +63,10 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 
 	private final org.apache.hadoop.fs.FileSystem fs;
 
+	/* This field caches the file system kind. It is lazily set because the file system
+	* URL is lazily initialized. */
+	private FileSystemKind fsKind;
+
 
 	/**
 	 * Creates a new DistributedFileSystem object to access HDFS, based on a class name
@@ -464,6 +471,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 	}
 
 	@Override
+	public FileSystemKind getKind() {
+		if (fsKind == null) {
+			fsKind = getKindForScheme(this.fs.getUri().getScheme());
+		}
+		return fsKind;
+	}
+
+	@Override
 	public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
 		Configuration hadoopConf = getHadoopConfiguration();
 		Class<? extends org.apache.hadoop.fs.FileSystem> clazz = null;
@@ -478,4 +493,36 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		}
 		return clazz;
 	}
+
+	/**
+	 * Gets the kind of the file system from its scheme.
+	 *
+	 * <p>Implementation note: Initially, especially within the Flink 1.3.x line
+	 * (in order to not break backwards compatibility), we must only label file systems
+	 * as 'inconsistent' or as 'not proper filesystems' if we are sure about it.
+	 * Otherwise, we cause regression for example in the performance and cleanup handling
+	 * of checkpoints.
+	 * For that reason, we initially mark some filesystems as 'eventually consistent' or
+	 * as 'object stores', and leave the others as 'consistent file systems'.
+	 */
+	static FileSystemKind getKindForScheme(String scheme) {
+		scheme = scheme.toLowerCase(Locale.US);
+
+		if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
+			// the Amazon S3 storage
+			return FileSystemKind.OBJECT_STORE;
+		}
+		else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
+			// file servers instead of file systems
+			// they might actually be consistent, but we have no hard guarantees
+			// currently to rely on that
+			return FileSystemKind.OBJECT_STORE;
+		}
+		else {
+			// the remainder should include hdfs, kosmos, ceph, ...
+			// this also includes federated HDFS (viewfs).
+			return FileSystemKind.FILE_SYSTEM;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/854b0537/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index 57eea6f..1ec34bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
 import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
@@ -381,7 +382,11 @@ public final class MapRFileSystem extends FileSystem {
 
 	@Override
 	public boolean isDistributedFS() {
-
 		return true;
 	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.FILE_SYSTEM;
+	}
 }