You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/06/15 08:35:09 UTC
flink git commit: [FLINK-6776] [runtime] Use skip instead of seek for
small forward repositioning in DFS streams
Repository: flink
Updated Branches:
refs/heads/master 59bd8bec1 -> 9141379f6
[FLINK-6776] [runtime] Use skip instead of seek for small forward repositioning in DFS streams
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9141379f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9141379f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9141379f
Branch: refs/heads/master
Commit: 9141379f6d2654886d48154b453170cc23b89a87
Parents: 59bd8be
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue May 30 17:08:26 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jun 15 10:34:38 2017 +0200
----------------------------------------------------------------------
.../runtime/fs/hdfs/HadoopDataInputStream.java | 64 +++++++++++++++++---
1 file changed, 56 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9141379f/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
index 47b63f0..6e3b065 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.core.fs.FSDataInputStream;
import javax.annotation.Nonnull;
+
import java.io.IOException;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -31,11 +32,27 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public final class HadoopDataInputStream extends FSDataInputStream {
+ /**
+ * Minimum amount of bytes to skip forward before we issue a seek instead of discarding read.
+ * <p>
+ * The current value is just a magic number. In the long run, this value could become configurable, but for now it
+ * is a conservative, relatively small value that should bring safe improvements for small skips (e.g. in reading
+ * meta data), that would hurt the most with frequent seeks.
+ * <p>
+ * The optimal value depends on the DFS implementation and configuration plus the underlying filesystem.
+ * For now, this number is chosen "big enough" to provide improvements for smaller seeks, and "small enough" to
+ * avoid disadvantages over real seeks. While the minimum should be the page size, a true optimum per system would
+ * be the amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, seektime is not
+ * constant and devices, OS, and DFS potentially also use read buffers and read-ahead.
+ */
+ private static final int MIN_SKIP_BYTES = 1024 * 1024;
+
+ /** The internal stream */
private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
/**
* Creates a new data input stream from the given Hadoop input stream
- *
+ *
* @param fsDataInputStream The Hadoop input stream
*/
public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
@@ -44,12 +61,18 @@ public final class HadoopDataInputStream extends FSDataInputStream {
@Override
- public void seek(long desired) throws IOException {
- // This optimization prevents some implementations of distributed FS to perform expensive seeks when they are
- // actually not needed
- if (desired != getPos()) {
- fsDataInputStream.seek(desired);
- }
+ public void seek(long seekPos) throws IOException {
+ // We do some optimizations to avoid that some implementations of distributed FS perform
+ // expensive seeks when they are actually not needed.
+ long delta = seekPos - getPos();
+
+ if (delta > 0L && delta <= MIN_SKIP_BYTES) {
+ // Instead of a small forward seek, we skip over the gap
+ skipFully(delta);
+ } else if (delta != 0L) {
+ // For larger gaps and backward seeks, we do a real seek
+ forceSeek(delta);
+ } // Do nothing if delta is zero.
}
@Override
@@ -71,7 +94,7 @@ public final class HadoopDataInputStream extends FSDataInputStream {
public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
return fsDataInputStream.read(buffer, offset, length);
}
-
+
@Override
public int available() throws IOException {
return fsDataInputStream.available();
@@ -89,4 +112,29 @@ public final class HadoopDataInputStream extends FSDataInputStream {
public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream() {
return fsDataInputStream;
}
+
+ /**
+ * Positions the stream to the given location. In contrast to {@link #seek(long)}, this method will
+ * always issue a "seek" command to the dfs and may not replace it by {@link #skip(long)} for small seeks.
+ * <p>
+ * Notice that the underlying DFS implementation can still decide to do skip instead of seek.
+ *
+ * @param seekPos the position to seek to.
+ * @throws IOException
+ */
+ public void forceSeek(long seekPos) throws IOException {
+ fsDataInputStream.seek(seekPos);
+ }
+
+ /**
+ * Skips over a given amount of bytes in the stream.
+ *
+ * @param bytes the number of bytes to skip.
+ * @throws IOException
+ */
+ public void skipFully(long bytes) throws IOException {
+ while (bytes > 0) {
+ bytes -= fsDataInputStream.skip(bytes);
+ }
+ }
}