You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/06/15 08:35:09 UTC

flink git commit: [FLINK-6776] [runtime] Use skip instead of seek for small forward repositioning in DFS streams

Repository: flink
Updated Branches:
  refs/heads/master 59bd8bec1 -> 9141379f6


[FLINK-6776] [runtime] Use skip instead of seek for small forward repositioning in DFS streams


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9141379f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9141379f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9141379f

Branch: refs/heads/master
Commit: 9141379f6d2654886d48154b453170cc23b89a87
Parents: 59bd8be
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue May 30 17:08:26 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jun 15 10:34:38 2017 +0200

----------------------------------------------------------------------
 .../runtime/fs/hdfs/HadoopDataInputStream.java  | 64 +++++++++++++++++---
 1 file changed, 56 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9141379f/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
index 47b63f0..6e3b065 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.fs.hdfs;
 import org.apache.flink.core.fs.FSDataInputStream;
 
 import javax.annotation.Nonnull;
+
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -31,11 +32,27 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public final class HadoopDataInputStream extends FSDataInputStream {
 
+	/**
+	 * Minimum amount of bytes to skip forward before we issue a seek instead of discarding read.
+	 * <p>
+	 * The current value is just a magic number. In the long run, this value could become configurable, but for now it
+	 * is a conservative, relatively small value that should bring safe improvements for small skips (e.g. in reading
+	 * meta data), that would hurt the most with frequent seeks.
+	 * <p>
+	 * The optimal value depends on the DFS implementation and configuration plus the underlying filesystem.
+	 * For now, this number is chosen "big enough" to provide improvements for smaller seeks, and "small enough" to
+	 * avoid disadvantages over real seeks. While the minimum should be the page size, a true optimum per system would
+	 * be the amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, seektime is not
+	 * constant and devices, OS, and DFS potentially also use read buffers and read-ahead.
+	 */
+	private static final int MIN_SKIP_BYTES = 1024 * 1024;
+
+	/** The internal stream */
 	private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
 
 	/**
 	 * Creates a new data input stream from the given Hadoop input stream
-	 * 
+	 *
 	 * @param fsDataInputStream The Hadoop input stream
 	 */
 	public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
@@ -44,12 +61,18 @@ public final class HadoopDataInputStream extends FSDataInputStream {
 
 
 	@Override
-	public void seek(long desired) throws IOException {
-		// This optimization prevents some implementations of distributed FS to perform expensive seeks when they are
-		// actually not needed
-		if (desired != getPos()) {
-			fsDataInputStream.seek(desired);
-		}
+	public void seek(long seekPos) throws IOException {
+		// We do some optimizations to avoid that some implementations of distributed FS perform
+		// expensive seeks when they are actually not needed.
+		long delta = seekPos - getPos();
+
+		if (delta > 0L && delta <= MIN_SKIP_BYTES) {
+			// Instead of a small forward seek, we skip over the gap
+			skipFully(delta);
+		} else if (delta != 0L) {
+			// For larger gaps and backward seeks, we do a real seek
+			forceSeek(delta);
+		} // Do nothing if delta is zero.
 	}
 
 	@Override
@@ -71,7 +94,7 @@ public final class HadoopDataInputStream extends FSDataInputStream {
 	public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
 		return fsDataInputStream.read(buffer, offset, length);
 	}
-	
+
 	@Override
 	public int available() throws IOException {
 		return fsDataInputStream.available();
@@ -89,4 +112,29 @@ public final class HadoopDataInputStream extends FSDataInputStream {
 	public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream() {
 		return fsDataInputStream;
 	}
+
+	/**
+	 * Positions the stream to the given location. In contrast to {@link #seek(long)}, this method will
+	 * always issue a "seek" command to the dfs and may not replace it by {@link #skip(long)} for small seeks.
+	 * <p>
+	 * Notice that the underlying DFS implementation can still decide to do skip instead of seek.
+	 *
+	 * @param seekPos the position to seek to.
+	 * @throws IOException
+	 */
+	public void forceSeek(long seekPos) throws IOException {
+		fsDataInputStream.seek(seekPos);
+	}
+
+	/**
+	 * Skips over a given amount of bytes in the stream.
+	 *
+	 * @param bytes the number of bytes to skip.
+	 * @throws IOException
+	 */
+	public void skipFully(long bytes) throws IOException {
+		while (bytes > 0) {
+			bytes -= fsDataInputStream.skip(bytes);
+		}
+	}
 }