You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:51 UTC

[flink] 03/11: [FLINK-19221][core][hadoop] Introduce the LocatedFileStatus to save block location RPC requests.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7375589cc17d801af1306c3bb7563fc539b91d01
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 17:33:49 2020 +0200

    [FLINK-19221][core][hadoop] Introduce the LocatedFileStatus to save block location RPC requests.
    
    When the HDFS Client returns a FileStatus (description of a file) it frequently returns a
    'LocatedFileStatus' which already contains all the BlockLocation information.
    
    We here expose this on the Flink side, to save RPC calls to the Name Node. For example file sources often
    request block locations for all files (to facilitate locality aware assignments), currently resulting in
    one RPC call to the Name Node for each file.
    
    When the FileStatus obtained from listing the directory (or getting details for a file) already has
    the block locations, we can save the extra RPC call per file to obtain that block location information.
    
    This closes #13394
---
 .../apache/flink/core/fs/LocatedFileStatus.java    | 43 +++++++++++++++++++
 .../flink/core/fs/local/LocalFileStatus.java       | 22 ++++++++--
 .../flink/core/fs/local/LocalFileSystem.java       |  7 ++--
 .../fs/anotherdummy/AnotherDummyFSFileSystem.java  |  4 +-
 .../apache/flink/fs/dummy/DummyFSFileSystem.java   |  4 +-
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java    | 17 +++++++-
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java    | 18 +++++---
 .../runtime/fs/hdfs/LocatedHadoopFileStatus.java   | 48 ++++++++++++++++++++++
 8 files changed, 144 insertions(+), 19 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/LocatedFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/LocatedFileStatus.java
new file mode 100644
index 0000000..f391b19
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/LocatedFileStatus.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * A {@code LocatedFileStatus} is a {@link FileStatus} that contains additionally the location information
+ * of the file directly. The information is accessible through the {@link #getBlockLocations()} ()} method.
+ *
+ * <p>This class eagerly communicates the block information (including locations) when that information
+ * is readily (or cheaply) available. That way users can avoid an additional call to
+ * {@link FileSystem#getFileBlockLocations(FileStatus, long, long)}, which is an additional RPC call for
+ * each file.
+ */
+@Public
+public interface LocatedFileStatus extends FileStatus {
+
+	/**
+	 * Gets the location information for the file. The location is per block, because each block may
+	 * live potentially at a different location.
+	 *
+	 * <p>Files without location information typically expose one block with no host information
+	 * for that block.
+	 */
+	BlockLocation[] getBlockLocations();
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
index 781e0d3..1b34c4f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
@@ -19,8 +19,10 @@
 package org.apache.flink.core.fs.local;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.LocatedFileStatus;
 import org.apache.flink.core.fs.Path;
 
 import java.io.File;
@@ -30,7 +32,7 @@ import java.io.File;
  * for the local file system.
  */
 @Internal
-public class LocalFileStatus implements FileStatus {
+public class LocalFileStatus implements LocatedFileStatus {
 
 	/**
 	 * The file this file status belongs to.
@@ -43,6 +45,11 @@ public class LocalFileStatus implements FileStatus {
 	private final Path path;
 
 	/**
+	 * Cached length field, to avoid repeated native/syscalls.
+	 */
+	private final long len;
+
+	/**
 	 * Creates a <code>LocalFileStatus</code> object from a given {@link File} object.
 	 *
 	 * @param f
@@ -53,6 +60,7 @@ public class LocalFileStatus implements FileStatus {
 	public LocalFileStatus(final File f, final FileSystem fs) {
 		this.file = f;
 		this.path = new Path(fs.getUri().getScheme() + ":" + f.toURI().getPath());
+		this.len = f.length();
 	}
 
 	@Override
@@ -62,12 +70,12 @@ public class LocalFileStatus implements FileStatus {
 
 	@Override
 	public long getBlockSize() {
-		return this.file.length();
+		return this.len;
 	}
 
 	@Override
 	public long getLen() {
-		return this.file.length();
+		return this.len;
 	}
 
 	@Override
@@ -90,6 +98,14 @@ public class LocalFileStatus implements FileStatus {
 		return this.path;
 	}
 
+	@Override
+	public BlockLocation[] getBlockLocations() {
+		// we construct this lazily here and don't cache it, because it is used only rarely
+		return new BlockLocation[] {
+			new LocalBlockLocation(len)
+		};
+	}
+
 	public File getFile() {
 		return this.file;
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 154fc2c..ae7beed 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -82,9 +82,10 @@ public class LocalFileSystem extends FileSystem {
 
 	@Override
 	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
-		return new BlockLocation[] {
-				new LocalBlockLocation(file.getLen())
-		};
+		if (file instanceof LocalFileStatus) {
+			return ((LocalFileStatus) file).getBlockLocations();
+		}
+		throw new IOException("File status does not belong to the LocalFileSystem: " + file);
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-plugins-test/another-dummy-fs/src/main/java/org/apache/flink/fs/anotherdummy/AnotherDummyFSFileSystem.java b/flink-end-to-end-tests/flink-plugins-test/another-dummy-fs/src/main/java/org/apache/flink/fs/anotherdummy/AnotherDummyFSFileSystem.java
index 99a5b47..31ad157 100644
--- a/flink-end-to-end-tests/flink-plugins-test/another-dummy-fs/src/main/java/org/apache/flink/fs/anotherdummy/AnotherDummyFSFileSystem.java
+++ b/flink-end-to-end-tests/flink-plugins-test/another-dummy-fs/src/main/java/org/apache/flink/fs/anotherdummy/AnotherDummyFSFileSystem.java
@@ -45,8 +45,6 @@ class AnotherDummyFSFileSystem extends FileSystem {
 
 	static final URI FS_URI = URI.create("anotherDummy:///");
 
-	private static final String HOSTNAME = "localhost";
-
 	private final URI workingDir;
 
 	private final URI homeDir;
@@ -93,7 +91,7 @@ class AnotherDummyFSFileSystem extends FileSystem {
 	@Override
 	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
 		return new BlockLocation[] {
-			new LocalBlockLocation(HOSTNAME, file.getLen())
+			new LocalBlockLocation(file.getLen())
 		};
 	}
 
diff --git a/flink-end-to-end-tests/flink-plugins-test/dummy-fs/src/main/java/org/apache/flink/fs/dummy/DummyFSFileSystem.java b/flink-end-to-end-tests/flink-plugins-test/dummy-fs/src/main/java/org/apache/flink/fs/dummy/DummyFSFileSystem.java
index ac11807..236d0d5 100644
--- a/flink-end-to-end-tests/flink-plugins-test/dummy-fs/src/main/java/org/apache/flink/fs/dummy/DummyFSFileSystem.java
+++ b/flink-end-to-end-tests/flink-plugins-test/dummy-fs/src/main/java/org/apache/flink/fs/dummy/DummyFSFileSystem.java
@@ -45,8 +45,6 @@ class DummyFSFileSystem extends FileSystem {
 
 	static final URI FS_URI = URI.create("dummy:///");
 
-	private static final String HOSTNAME = "localhost";
-
 	private final URI workingDir;
 
 	private final URI homeDir;
@@ -93,7 +91,7 @@ class DummyFSFileSystem extends FileSystem {
 	@Override
 	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
 		return new BlockLocation[] {
-			new LocalBlockLocation(HOSTNAME, file.getLen())
+			new LocalBlockLocation(file.getLen())
 		};
 	}
 
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
index 2346d92..2d21498 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
@@ -23,9 +23,9 @@ import org.apache.flink.core.fs.Path;
 
 /**
  * Concrete implementation of the {@link FileStatus} interface for the
- * Hadoop Distribution File System.
+ * Hadoop Distributed File System.
  */
-public final class HadoopFileStatus implements FileStatus {
+public class HadoopFileStatus implements FileStatus {
 
 	private final org.apache.hadoop.fs.FileStatus fileStatus;
 
@@ -77,4 +77,17 @@ public final class HadoopFileStatus implements FileStatus {
 	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
 		return this.fileStatus;
 	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new {@code HadoopFileStatus} from Hadoop's {@link org.apache.hadoop.fs.FileStatus}.
+	 * If Hadoop's file status is <i>located</i>, i.e., it contains block information, then this method
+	 * returns an implementation of Flink's {@link org.apache.flink.core.fs.LocatedFileStatus}.
+	 */
+	public static HadoopFileStatus fromHadoopStatus(final org.apache.hadoop.fs.FileStatus fileStatus) {
+		return fileStatus instanceof org.apache.hadoop.fs.LocatedFileStatus
+				? new LocatedHadoopFileStatus((org.apache.hadoop.fs.LocatedFileStatus) fileStatus)
+				: new HadoopFileStatus(fileStatus);
+	}
 }
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 1135e01..84f4ec7 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -83,7 +83,7 @@ public class HadoopFileSystem extends FileSystem {
 	@Override
 	public FileStatus getFileStatus(final Path f) throws IOException {
 		org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(toHadoopPath(f));
-		return new HadoopFileStatus(status);
+		return HadoopFileStatus.fromHadoopStatus(status);
 	}
 
 	@Override
@@ -93,10 +93,18 @@ public class HadoopFileSystem extends FileSystem {
 			throw new IOException("file is not an instance of DistributedFileStatus");
 		}
 
-		final HadoopFileStatus f = (HadoopFileStatus) file;
+		// shortcut - if the status already has the information, return it.
+		if (file instanceof LocatedHadoopFileStatus) {
+			return ((LocatedHadoopFileStatus) file).getBlockLocations();
+		}
+
+		final org.apache.hadoop.fs.FileStatus hadoopStatus = ((HadoopFileStatus) file).getInternalFileStatus();
 
-		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
-			start, len);
+		// second shortcut - if the internal status already has the information, return it.
+		// only if that is not the case, to the actual HDFS call (RPC to Name Node)
+		final org.apache.hadoop.fs.BlockLocation[] blkLocations = hadoopStatus instanceof org.apache.hadoop.fs.LocatedFileStatus
+				? ((org.apache.hadoop.fs.LocatedFileStatus) hadoopStatus).getBlockLocations()
+				: fs.getFileBlockLocations(hadoopStatus, start, len);
 
 		// Wrap up HDFS specific block location objects
 		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
@@ -159,7 +167,7 @@ public class HadoopFileSystem extends FileSystem {
 
 		// Convert types
 		for (int i = 0; i < files.length; i++) {
-			files[i] = new HadoopFileStatus(hadoopFiles[i]);
+			files[i] = HadoopFileStatus.fromHadoopStatus(hadoopFiles[i]);
 		}
 
 		return files;
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/LocatedHadoopFileStatus.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/LocatedHadoopFileStatus.java
new file mode 100644
index 0000000..5ee1f6a
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/LocatedHadoopFileStatus.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.LocatedFileStatus;
+
+/**
+ * Concrete implementation of the {@link LocatedFileStatus} interface for the
+ * Hadoop Distributed File System.
+ */
+public final class LocatedHadoopFileStatus extends HadoopFileStatus implements LocatedFileStatus {
+
+	/**
+	 * Creates a new located file status from an HDFS file status.
+	 */
+	public LocatedHadoopFileStatus(org.apache.hadoop.fs.LocatedFileStatus fileStatus) {
+		super(fileStatus);
+	}
+
+	@Override
+	public BlockLocation[] getBlockLocations() {
+		final org.apache.hadoop.fs.BlockLocation[] hadoopLocations =
+				((org.apache.hadoop.fs.LocatedFileStatus) getInternalFileStatus()).getBlockLocations();
+
+		final HadoopBlockLocation[] locations = new HadoopBlockLocation[hadoopLocations.length];
+		for (int i = 0; i < locations.length; i++) {
+			locations[i] = new HadoopBlockLocation(hadoopLocations[i]);
+		}
+		return locations;
+	}
+}