You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:49 UTC

[flink] 01/11: [hotfix][hadoop] Minor code cleanups in HadoopFileStatus

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 070aacb6a9c0e4cdef50b220e78f6f1c8b134155
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 15:58:25 2020 +0200

    [hotfix][hadoop] Minor code cleanups in HadoopFileStatus
---
 .../flink/runtime/fs/hdfs/HadoopBlockLocation.java | 55 +++++++++++-----------
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java    | 20 +++-----
 2 files changed, 34 insertions(+), 41 deletions(-)

diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
index 1484c95..2880bae 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Implementation of the {@link BlockLocation} interface for the
  * Hadoop Distributed File System.
@@ -57,19 +59,16 @@ public final class HadoopBlockLocation implements BlockLocation {
 	 *        the original HDFS block location
 	 */
 	public HadoopBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
-
-		this.blockLocation = blockLocation;
+		this.blockLocation = checkNotNull(blockLocation, "blockLocation");
 	}
 
 	@Override
 	public String[] getHosts() throws IOException {
 
-		/**
-		 * Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
-		 * the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
-		 * depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
-		 * sure it does not contain the domain suffix.
-		 */
+		// Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
+		// the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
+		//depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
+		//sure it does not contain the domain suffix.
 		if (this.hostnames == null) {
 
 			final String[] hadoopHostnames = blockLocation.getHosts();
@@ -83,6 +82,26 @@ public final class HadoopBlockLocation implements BlockLocation {
 		return this.hostnames;
 	}
 
+	@Override
+	public long getLength() {
+		return this.blockLocation.getLength();
+	}
+
+	@Override
+	public long getOffset() {
+		return this.blockLocation.getOffset();
+	}
+
+	@Override
+	public int compareTo(final BlockLocation o) {
+		final long diff = getOffset() - o.getOffset();
+		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Looks for a domain suffix in a FQDN and strips it if present.
 	 *
@@ -110,24 +129,4 @@ public final class HadoopBlockLocation implements BlockLocation {
 
 		return originalHostname.substring(0, index);
 	}
-
-	@Override
-	public long getLength() {
-
-		return this.blockLocation.getLength();
-	}
-
-	@Override
-	public long getOffset() {
-
-		return this.blockLocation.getOffset();
-	}
-
-	@Override
-	public int compareTo(final BlockLocation o) {
-
-		final long diff = getOffset() - o.getOffset();
-
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
 }
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
index 08d31de..2346d92 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
@@ -27,7 +27,7 @@ import org.apache.flink.core.fs.Path;
  */
 public final class HadoopFileStatus implements FileStatus {
 
-	private org.apache.hadoop.fs.FileStatus fileStatus;
+	private final org.apache.hadoop.fs.FileStatus fileStatus;
 
 	/**
 	 * Creates a new file status from an HDFS file status.
@@ -46,12 +46,7 @@ public final class HadoopFileStatus implements FileStatus {
 
 	@Override
 	public long getBlockSize() {
-		long blocksize = fileStatus.getBlockSize();
-		if (blocksize > fileStatus.getLen()) {
-			return fileStatus.getLen();
-		}
-
-		return blocksize;
+		return Math.min(fileStatus.getBlockSize(), fileStatus.getLen());
 	}
 
 	@Override
@@ -69,18 +64,17 @@ public final class HadoopFileStatus implements FileStatus {
 		return fileStatus.getReplication();
 	}
 
-	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
-		return this.fileStatus;
-	}
-
 	@Override
 	public Path getPath() {
 		return new Path(fileStatus.getPath().toUri());
 	}
 
-	@SuppressWarnings("deprecation")
 	@Override
 	public boolean isDir() {
-		return fileStatus.isDir();
+		return fileStatus.isDirectory();
+	}
+
+	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
+		return this.fileStatus;
 	}
 }