You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/06/15 12:07:37 UTC
flink git commit: [hotfix] Additional tests for
HadoopDataInputStream#skip vs #seek
Repository: flink
Updated Branches:
refs/heads/master d8756553c -> f27cb2ac9
[hotfix] Additional tests for HadoopDataInputStream#skip vs #seek
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f27cb2ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f27cb2ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f27cb2ac
Branch: refs/heads/master
Commit: f27cb2ac990a26fae2b6c68474ebd35cda1fe001
Parents: d875655
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jun 15 11:48:32 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jun 15 14:07:28 2017 +0200
----------------------------------------------------------------------
.../runtime/fs/hdfs/HadoopDataInputStream.java | 4 +-
.../fs/hdfs/HadoopDataInputStreamTest.java | 128 +++++++++++++++++++
2 files changed, 130 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f27cb2ac/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
index 6e3b065..3cc841e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -45,7 +45,7 @@ public final class HadoopDataInputStream extends FSDataInputStream {
* be the amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, seektime is not
* constant and devices, OS, and DFS potentially also use read buffers and read-ahead.
*/
- private static final int MIN_SKIP_BYTES = 1024 * 1024;
+ public static final int MIN_SKIP_BYTES = 1024 * 1024;
/** The internal stream */
private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
@@ -71,7 +71,7 @@ public final class HadoopDataInputStream extends FSDataInputStream {
skipFully(delta);
} else if (delta != 0L) {
// For larger gaps and backward seeks, we do a real seek
- forceSeek(delta);
+ forceSeek(seekPos);
} // Do nothing if delta is zero.
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f27cb2ac/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
new file mode 100644
index 0000000..58de3db
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public class HadoopDataInputStreamTest {
+
+ private FSDataInputStream verifyInputStream;
+ private HadoopDataInputStream testInputStream;
+
+ @Test
+ public void testSeekSkip() throws IOException {
+ verifyInputStream = spy(new FSDataInputStream(new SeekableByteArrayInputStream(new byte[2 * HadoopDataInputStream.MIN_SKIP_BYTES])));
+ testInputStream = new HadoopDataInputStream(verifyInputStream);
+ seekAndAssert(10);
+ seekAndAssert(10 + HadoopDataInputStream.MIN_SKIP_BYTES + 1);
+ seekAndAssert(testInputStream.getPos() - 1);
+ seekAndAssert(testInputStream.getPos() + 1);
+ seekAndAssert(testInputStream.getPos() - HadoopDataInputStream.MIN_SKIP_BYTES);
+ seekAndAssert(testInputStream.getPos());
+ seekAndAssert(0);
+ seekAndAssert(testInputStream.getPos() + HadoopDataInputStream.MIN_SKIP_BYTES);
+ seekAndAssert(testInputStream.getPos() + HadoopDataInputStream.MIN_SKIP_BYTES - 1);
+
+ try {
+ seekAndAssert(-1);
+ Assert.fail();
+ } catch (Exception ignore) {
+ }
+
+ try {
+ seekAndAssert(-HadoopDataInputStream.MIN_SKIP_BYTES - 1);
+ Assert.fail();
+ } catch (Exception ignore) {
+ }
+ }
+
+ private void seekAndAssert(long seekPos) throws IOException {
+ Assert.assertEquals(verifyInputStream.getPos(), testInputStream.getPos());
+ long delta = seekPos - testInputStream.getPos();
+ testInputStream.seek(seekPos);
+
+ if (delta > 0L && delta <= HadoopDataInputStream.MIN_SKIP_BYTES) {
+ verify(verifyInputStream, atLeastOnce()).skip(anyLong());
+ verify(verifyInputStream, never()).seek(anyLong());
+ } else if (delta != 0L) {
+ verify(verifyInputStream, atLeastOnce()).seek(seekPos);
+ verify(verifyInputStream, never()).skip(anyLong());
+ } else {
+ verify(verifyInputStream, never()).seek(anyLong());
+ verify(verifyInputStream, never()).skip(anyLong());
+ }
+
+ Assert.assertEquals(seekPos, verifyInputStream.getPos());
+ reset(verifyInputStream);
+ }
+
+ private static final class SeekableByteArrayInputStream
+ extends ByteArrayInputStreamWithPos
+ implements Seekable, PositionedReadable {
+
+ public SeekableByteArrayInputStream(byte[] buffer) {
+ super(buffer);
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ setPosition((int) pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return getPosition();
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+}