You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:49 UTC
[flink] 01/11: [hotfix][hadoop] Minor code cleanups in
HadoopFileStatus
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 070aacb6a9c0e4cdef50b220e78f6f1c8b134155
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 15:58:25 2020 +0200
[hotfix][hadoop] Minor code cleanups in HadoopFileStatus
---
.../flink/runtime/fs/hdfs/HadoopBlockLocation.java | 55 +++++++++++-----------
.../flink/runtime/fs/hdfs/HadoopFileStatus.java | 20 +++-----
2 files changed, 34 insertions(+), 41 deletions(-)
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
index 1484c95..2880bae 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Implementation of the {@link BlockLocation} interface for the
* Hadoop Distributed File System.
@@ -57,19 +59,16 @@ public final class HadoopBlockLocation implements BlockLocation {
* the original HDFS block location
*/
public HadoopBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
-
- this.blockLocation = blockLocation;
+ this.blockLocation = checkNotNull(blockLocation, "blockLocation");
}
@Override
public String[] getHosts() throws IOException {
- /**
- * Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
- * the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
- * depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
- * sure it does not contain the domain suffix.
- */
+ // Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
+ // the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
+ //depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
+ //sure it does not contain the domain suffix.
if (this.hostnames == null) {
final String[] hadoopHostnames = blockLocation.getHosts();
@@ -83,6 +82,26 @@ public final class HadoopBlockLocation implements BlockLocation {
return this.hostnames;
}
+ @Override
+ public long getLength() {
+ return this.blockLocation.getLength();
+ }
+
+ @Override
+ public long getOffset() {
+ return this.blockLocation.getOffset();
+ }
+
+ @Override
+ public int compareTo(final BlockLocation o) {
+ final long diff = getOffset() - o.getOffset();
+ return diff < 0 ? -1 : diff > 0 ? 1 : 0;
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
/**
* Looks for a domain suffix in a FQDN and strips it if present.
*
@@ -110,24 +129,4 @@ public final class HadoopBlockLocation implements BlockLocation {
return originalHostname.substring(0, index);
}
-
- @Override
- public long getLength() {
-
- return this.blockLocation.getLength();
- }
-
- @Override
- public long getOffset() {
-
- return this.blockLocation.getOffset();
- }
-
- @Override
- public int compareTo(final BlockLocation o) {
-
- final long diff = getOffset() - o.getOffset();
-
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
}
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
index 08d31de..2346d92 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
@@ -27,7 +27,7 @@ import org.apache.flink.core.fs.Path;
*/
public final class HadoopFileStatus implements FileStatus {
- private org.apache.hadoop.fs.FileStatus fileStatus;
+ private final org.apache.hadoop.fs.FileStatus fileStatus;
/**
* Creates a new file status from an HDFS file status.
@@ -46,12 +46,7 @@ public final class HadoopFileStatus implements FileStatus {
@Override
public long getBlockSize() {
- long blocksize = fileStatus.getBlockSize();
- if (blocksize > fileStatus.getLen()) {
- return fileStatus.getLen();
- }
-
- return blocksize;
+ return Math.min(fileStatus.getBlockSize(), fileStatus.getLen());
}
@Override
@@ -69,18 +64,17 @@ public final class HadoopFileStatus implements FileStatus {
return fileStatus.getReplication();
}
- public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
- return this.fileStatus;
- }
-
@Override
public Path getPath() {
return new Path(fileStatus.getPath().toUri());
}
- @SuppressWarnings("deprecation")
@Override
public boolean isDir() {
- return fileStatus.isDir();
+ return fileStatus.isDirectory();
+ }
+
+ public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
+ return this.fileStatus;
}
}