You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/06/15 12:07:37 UTC

flink git commit: [hotfix] Additional tests for HadoopDataInputStream#skip vs #seek

Repository: flink
Updated Branches:
  refs/heads/master d8756553c -> f27cb2ac9


[hotfix] Additional tests for HadoopDataInputStream#skip vs #seek


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f27cb2ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f27cb2ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f27cb2ac

Branch: refs/heads/master
Commit: f27cb2ac990a26fae2b6c68474ebd35cda1fe001
Parents: d875655
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jun 15 11:48:32 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jun 15 14:07:28 2017 +0200

----------------------------------------------------------------------
 .../runtime/fs/hdfs/HadoopDataInputStream.java  |   4 +-
 .../fs/hdfs/HadoopDataInputStreamTest.java      | 128 +++++++++++++++++++
 2 files changed, 130 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f27cb2ac/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
index 6e3b065..3cc841e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -45,7 +45,7 @@ public final class HadoopDataInputStream extends FSDataInputStream {
 	 * be the amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, seektime is not
 	 * constant and devices, OS, and DFS potentially also use read buffers and read-ahead.
 	 */
-	private static final int MIN_SKIP_BYTES = 1024 * 1024;
+	public static final int MIN_SKIP_BYTES = 1024 * 1024;
 
 	/** The internal stream */
 	private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
@@ -71,7 +71,7 @@ public final class HadoopDataInputStream extends FSDataInputStream {
 			skipFully(delta);
 		} else if (delta != 0L) {
 			// For larger gaps and backward seeks, we do a real seek
-			forceSeek(delta);
+			forceSeek(seekPos);
 		} // Do nothing if delta is zero.
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f27cb2ac/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
new file mode 100644
index 0000000..58de3db
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public class HadoopDataInputStreamTest {
+
+	private FSDataInputStream verifyInputStream;
+	private HadoopDataInputStream testInputStream;
+
+	@Test
+	public void testSeekSkip() throws IOException {
+		verifyInputStream = spy(new FSDataInputStream(new SeekableByteArrayInputStream(new byte[2 * HadoopDataInputStream.MIN_SKIP_BYTES])));
+		testInputStream = new HadoopDataInputStream(verifyInputStream);
+		seekAndAssert(10);
+		seekAndAssert(10 + HadoopDataInputStream.MIN_SKIP_BYTES + 1);
+		seekAndAssert(testInputStream.getPos() - 1);
+		seekAndAssert(testInputStream.getPos() + 1);
+		seekAndAssert(testInputStream.getPos() - HadoopDataInputStream.MIN_SKIP_BYTES);
+		seekAndAssert(testInputStream.getPos());
+		seekAndAssert(0);
+		seekAndAssert(testInputStream.getPos() + HadoopDataInputStream.MIN_SKIP_BYTES);
+		seekAndAssert(testInputStream.getPos() + HadoopDataInputStream.MIN_SKIP_BYTES - 1);
+
+		try {
+			seekAndAssert(-1);
+			Assert.fail();
+		} catch (Exception ignore) {
+		}
+
+		try {
+			seekAndAssert(-HadoopDataInputStream.MIN_SKIP_BYTES - 1);
+			Assert.fail();
+		} catch (Exception ignore) {
+		}
+	}
+
+	private void seekAndAssert(long seekPos) throws IOException {
+		Assert.assertEquals(verifyInputStream.getPos(), testInputStream.getPos());
+		long delta = seekPos - testInputStream.getPos();
+		testInputStream.seek(seekPos);
+
+		if (delta > 0L && delta <= HadoopDataInputStream.MIN_SKIP_BYTES) {
+			verify(verifyInputStream, atLeastOnce()).skip(anyLong());
+			verify(verifyInputStream, never()).seek(anyLong());
+		} else if (delta != 0L) {
+			verify(verifyInputStream, atLeastOnce()).seek(seekPos);
+			verify(verifyInputStream, never()).skip(anyLong());
+		} else {
+			verify(verifyInputStream, never()).seek(anyLong());
+			verify(verifyInputStream, never()).skip(anyLong());
+		}
+
+		Assert.assertEquals(seekPos, verifyInputStream.getPos());
+		reset(verifyInputStream);
+	}
+
+	private static final class SeekableByteArrayInputStream
+		extends ByteArrayInputStreamWithPos
+		implements Seekable, PositionedReadable {
+
+		public SeekableByteArrayInputStream(byte[] buffer) {
+			super(buffer);
+		}
+
+		@Override
+		public void seek(long pos) throws IOException {
+			setPosition((int) pos);
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return getPosition();
+		}
+
+		@Override
+		public boolean seekToNewSource(long targetPos) throws IOException {
+			return false;
+		}
+
+		@Override
+		public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void readFully(long position, byte[] buffer) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
+}