You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2019/04/30 21:52:23 UTC
[hadoop] branch trunk updated: HDFS-3246: pRead equivalent for
direct read path (#597)
This is an automated email from the ASF dual-hosted git repository.
todd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4877f0a HDFS-3246: pRead equivalent for direct read path (#597)
4877f0a is described below
commit 4877f0aa518832c37a06e6d3bd2c9552fc3141dc
Author: Sahil Takiar <sa...@users.noreply.github.com>
AuthorDate: Tue Apr 30 16:52:16 2019 -0500
HDFS-3246: pRead equivalent for direct read path (#597)
HDFS-3246: pRead equivalent for direct read path
Contributed by Sahil Takiar
---
.../apache/hadoop/crypto/CryptoInputStream.java | 292 ++++++++++++++-------
.../hadoop/fs/ByteBufferPositionedReadable.java | 66 +++++
.../org/apache/hadoop/fs/ByteBufferReadable.java | 17 +-
.../org/apache/hadoop/fs/FSDataInputStream.java | 15 +-
.../org/apache/hadoop/fs/StreamCapabilities.java | 6 +
.../hadoop/crypto/CryptoStreamsTestBase.java | 185 ++++++++++++-
.../apache/hadoop/crypto/TestCryptoStreams.java | 35 ++-
.../hadoop/crypto/TestCryptoStreamsForLocalFS.java | 10 +
.../hadoop/crypto/TestCryptoStreamsNormal.java | 10 +
.../org/apache/hadoop/hdfs/DFSInputStream.java | 14 +-
.../src/main/native/libhdfs-tests/hdfs_test.h | 18 ++
.../main/native/libhdfs-tests/test_libhdfs_ops.c | 142 +++++++++-
.../src/main/native/libhdfs/hdfs.c | 210 +++++++++++++--
.../apache/hadoop/hdfs/TestByteBufferPread.java | 269 +++++++++++++++++++
14 files changed, 1134 insertions(+), 155 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index 67e8690..80364ce 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
@@ -64,7 +65,8 @@ import org.apache.hadoop.util.StringUtils;
public class CryptoInputStream extends FilterInputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
- ReadableByteChannel, CanUnbuffer, StreamCapabilities {
+ ReadableByteChannel, CanUnbuffer, StreamCapabilities,
+ ByteBufferPositionedReadable {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Decryptor decryptor;
@@ -327,19 +329,39 @@ public class CryptoInputStream extends FilterInputStream implements
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
checkStream();
- try {
- final int n = ((PositionedReadable) in).read(position, buffer, offset,
- length);
- if (n > 0) {
- // This operation does not change the current offset of the file
- decrypt(position, buffer, offset, n);
- }
-
- return n;
- } catch (ClassCastException e) {
+ if (!(in instanceof PositionedReadable)) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned read.");
}
+ final int n = ((PositionedReadable) in).read(position, buffer, offset,
+ length);
+ if (n > 0) {
+ // This operation does not change the current offset of the file
+ decrypt(position, buffer, offset, n);
+ }
+
+ return n;
+ }
+
+ /**
+ * Positioned read using {@link ByteBuffer}s. This method is thread-safe.
+ */
+ @Override
+ public int read(long position, final ByteBuffer buf)
+ throws IOException {
+ checkStream();
+ if (!(in instanceof ByteBufferPositionedReadable)) {
+ throw new UnsupportedOperationException("This stream does not support " +
+ "positioned reads with byte buffers.");
+ }
+ int bufPos = buf.position();
+ final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
+ if (n > 0) {
+ // This operation does not change the current offset of the file
+ decrypt(position, buf, n, bufPos);
+ }
+
+ return n;
}
/**
@@ -348,49 +370,124 @@ public class CryptoInputStream extends FilterInputStream implements
*/
private void decrypt(long position, byte[] buffer, int offset, int length)
throws IOException {
- ByteBuffer inBuffer = getBuffer();
- ByteBuffer outBuffer = getBuffer();
+ ByteBuffer localInBuffer = null;
+ ByteBuffer localOutBuffer = null;
Decryptor decryptor = null;
try {
+ localInBuffer = getBuffer();
+ localOutBuffer = getBuffer();
decryptor = getDecryptor();
byte[] iv = initIV.clone();
updateDecryptor(decryptor, position, iv);
byte padding = getPadding(position);
- inBuffer.position(padding); // Set proper position for input data.
+ localInBuffer.position(padding); // Set proper position for input data.
int n = 0;
while (n < length) {
- int toDecrypt = Math.min(length - n, inBuffer.remaining());
- inBuffer.put(buffer, offset + n, toDecrypt);
+ int toDecrypt = Math.min(length - n, localInBuffer.remaining());
+ localInBuffer.put(buffer, offset + n, toDecrypt);
// Do decryption
- decrypt(decryptor, inBuffer, outBuffer, padding);
+ decrypt(decryptor, localInBuffer, localOutBuffer, padding);
- outBuffer.get(buffer, offset + n, toDecrypt);
+ localOutBuffer.get(buffer, offset + n, toDecrypt);
n += toDecrypt;
- padding = afterDecryption(decryptor, inBuffer, position + n, iv);
+ padding = afterDecryption(decryptor, localInBuffer, position + n, iv);
}
} finally {
- returnBuffer(inBuffer);
- returnBuffer(outBuffer);
+ returnBuffer(localInBuffer);
+ returnBuffer(localOutBuffer);
returnDecryptor(decryptor);
}
}
-
+
+ /**
+ * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are
+ * decrypted from {@code buf} starting at {@code start}.
+ * {@code buf.position()} and {@code buf.limit()} are unchanged after this
+ * method returns. This method is thread-safe.
+ *
+ * <p>
+ * This method decrypts the input buf chunk-by-chunk and writes the
+ * decrypted output back into the input buf. It uses two local buffers
+ * taken from the {@link #bufferPool} to assist in this process: one is
+ * designated as the input buffer and it stores a single chunk of the
+ * given buf, the other is designated as the output buffer, which stores
+ * the output of decrypting the input buffer. Both buffers are of size
+ * {@link #bufferSize}.
+ * </p>
+ *
+ * <p>
+ * Decryption is done by using a {@link Decryptor} and the
+ * {@link #decrypt(Decryptor, ByteBuffer, ByteBuffer, byte)} method. Once
+ * the decrypted data is written into the output buffer, is is copied back
+ * into buf. Both buffers are returned back into the pool once the entire
+ * buf is decrypted.
+ * </p>
+ *
+ * @param filePosition the current position of the file being read
+ * @param buf the {@link ByteBuffer} to decrypt
+ * @param length the number of bytes in {@code buf} to decrypt
+ * @param start the position in {@code buf} to start decrypting data from
+ */
+ private void decrypt(long filePosition, ByteBuffer buf, int length, int start)
+ throws IOException {
+ ByteBuffer localInBuffer = null;
+ ByteBuffer localOutBuffer = null;
+
+ // Duplicate the buffer so we don't have to worry about resetting the
+ // original position and limit at the end of the method
+ buf = buf.duplicate();
+
+ int decryptedBytes = 0;
+ Decryptor localDecryptor = null;
+ try {
+ localInBuffer = getBuffer();
+ localOutBuffer = getBuffer();
+ localDecryptor = getDecryptor();
+ byte[] localIV = initIV.clone();
+ updateDecryptor(localDecryptor, filePosition, localIV);
+ byte localPadding = getPadding(filePosition);
+ // Set proper filePosition for inputdata.
+ localInBuffer.position(localPadding);
+
+ while (decryptedBytes < length) {
+ buf.position(start + decryptedBytes);
+ buf.limit(start + decryptedBytes +
+ Math.min(length - decryptedBytes, localInBuffer.remaining()));
+ localInBuffer.put(buf);
+ // Do decryption
+ try {
+ decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding);
+ buf.position(start + decryptedBytes);
+ buf.limit(start + length);
+ decryptedBytes += localOutBuffer.remaining();
+ buf.put(localOutBuffer);
+ } finally {
+ localPadding = afterDecryption(localDecryptor, localInBuffer,
+ filePosition + length, localIV);
+ }
+ }
+ } finally {
+ returnBuffer(localInBuffer);
+ returnBuffer(localOutBuffer);
+ returnDecryptor(localDecryptor);
+ }
+ }
+
/** Positioned read fully. It is thread-safe */
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
checkStream();
- try {
- ((PositionedReadable) in).readFully(position, buffer, offset, length);
- if (length > 0) {
- // This operation does not change the current offset of the file
- decrypt(position, buffer, offset, length);
- }
- } catch (ClassCastException e) {
+ if (!(in instanceof PositionedReadable)) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned readFully.");
}
+ ((PositionedReadable) in).readFully(position, buffer, offset, length);
+ if (length > 0) {
+ // This operation does not change the current offset of the file
+ decrypt(position, buffer, offset, length);
+ }
}
@Override
@@ -405,23 +502,22 @@ public class CryptoInputStream extends FilterInputStream implements
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
checkStream();
- try {
- /*
- * If data of target pos in the underlying stream has already been read
- * and decrypted in outBuffer, we just need to re-position outBuffer.
- */
- if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
- int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
- if (forward > 0) {
- outBuffer.position(outBuffer.position() + forward);
- }
- } else {
- ((Seekable) in).seek(pos);
- resetStreamOffset(pos);
+ /*
+ * If data of target pos in the underlying stream has already been read
+ * and decrypted in outBuffer, we just need to re-position outBuffer.
+ */
+ if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
+ int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
+ if (forward > 0) {
+ outBuffer.position(outBuffer.position() + forward);
}
- } catch (ClassCastException e) {
- throw new UnsupportedOperationException("This stream does not support " +
- "seek.");
+ } else {
+ if (!(in instanceof Seekable)) {
+ throw new UnsupportedOperationException("This stream does not " +
+ "support seek.");
+ }
+ ((Seekable) in).seek(pos);
+ resetStreamOffset(pos);
}
}
@@ -519,31 +615,34 @@ public class CryptoInputStream extends FilterInputStream implements
}
/**
- * Decrypt all data in buf: total n bytes from given start position.
- * Output is also buf and same start position.
- * buf.position() and buf.limit() should be unchanged after decryption.
+ * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are
+ * decrypted from {@code buf} starting at {@code start}.
+ * {@code buf.position()} and {@code buf.limit()} are unchanged after this
+ * method returns.
+ *
+ * @see #decrypt(long, ByteBuffer, int, int)
*/
- private void decrypt(ByteBuffer buf, int n, int start)
+ private void decrypt(ByteBuffer buf, int length, int start)
throws IOException {
- final int pos = buf.position();
- final int limit = buf.limit();
- int len = 0;
- while (len < n) {
- buf.position(start + len);
- buf.limit(start + len + Math.min(n - len, inBuffer.remaining()));
+ buf = buf.duplicate();
+ int decryptedBytes = 0;
+ while (decryptedBytes < length) {
+ buf.position(start + decryptedBytes);
+ buf.limit(start + decryptedBytes +
+ Math.min(length - decryptedBytes, inBuffer.remaining()));
inBuffer.put(buf);
// Do decryption
try {
decrypt(decryptor, inBuffer, outBuffer, padding);
- buf.position(start + len);
- buf.limit(limit);
- len += outBuffer.remaining();
+ buf.position(start + decryptedBytes);
+ buf.limit(start + length);
+ decryptedBytes += outBuffer.remaining();
buf.put(outBuffer);
} finally {
- padding = afterDecryption(decryptor, inBuffer, streamOffset - (n - len), iv);
+ padding = afterDecryption(decryptor, inBuffer,
+ streamOffset - (length - decryptedBytes), iv);
}
}
- buf.position(pos);
}
@Override
@@ -572,14 +671,13 @@ public class CryptoInputStream extends FilterInputStream implements
Preconditions.checkArgument(targetPos >= 0,
"Cannot seek to negative offset.");
checkStream();
- try {
- boolean result = ((Seekable) in).seekToNewSource(targetPos);
- resetStreamOffset(targetPos);
- return result;
- } catch (ClassCastException e) {
+ if (!(in instanceof Seekable)) {
throw new UnsupportedOperationException("This stream does not support " +
"seekToNewSource.");
}
+ boolean result = ((Seekable) in).seekToNewSource(targetPos);
+ resetStreamOffset(targetPos);
+ return result;
}
@Override
@@ -587,59 +685,59 @@ public class CryptoInputStream extends FilterInputStream implements
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
checkStream();
- try {
- if (outBuffer.remaining() > 0) {
- // Have some decrypted data unread, need to reset.
- ((Seekable) in).seek(getPos());
- resetStreamOffset(getPos());
+ if (outBuffer.remaining() > 0) {
+ if (!(in instanceof Seekable)) {
+ throw new UnsupportedOperationException("This stream does not " +
+ "support seek.");
}
- final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
- read(bufferPool, maxLength, opts);
- if (buffer != null) {
- final int n = buffer.remaining();
- if (n > 0) {
- streamOffset += buffer.remaining(); // Read n bytes
- final int pos = buffer.position();
- decrypt(buffer, n, pos);
- }
- }
- return buffer;
- } catch (ClassCastException e) {
- throw new UnsupportedOperationException("This stream does not support " +
+ // Have some decrypted data unread, need to reset.
+ ((Seekable) in).seek(getPos());
+ resetStreamOffset(getPos());
+ }
+ if (!(in instanceof HasEnhancedByteBufferAccess)) {
+ throw new UnsupportedOperationException("This stream does not support " +
"enhanced byte buffer access.");
}
+ final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
+ read(bufferPool, maxLength, opts);
+ if (buffer != null) {
+ final int n = buffer.remaining();
+ if (n > 0) {
+ streamOffset += buffer.remaining(); // Read n bytes
+ final int pos = buffer.position();
+ decrypt(buffer, n, pos);
+ }
+ }
+ return buffer;
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
- try {
- ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
- } catch (ClassCastException e) {
+ if (!(in instanceof HasEnhancedByteBufferAccess)) {
throw new UnsupportedOperationException("This stream does not support " +
"release buffer.");
}
+ ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
}
@Override
public void setReadahead(Long readahead) throws IOException,
UnsupportedOperationException {
- try {
- ((CanSetReadahead) in).setReadahead(readahead);
- } catch (ClassCastException e) {
+ if (!(in instanceof CanSetReadahead)) {
throw new UnsupportedOperationException("This stream does not support " +
"setting the readahead caching strategy.");
}
+ ((CanSetReadahead) in).setReadahead(readahead);
}
@Override
public void setDropBehind(Boolean dropCache) throws IOException,
UnsupportedOperationException {
- try {
- ((CanSetDropBehind) in).setDropBehind(dropCache);
- } catch (ClassCastException e) {
+ if (!(in instanceof CanSetReadahead)) {
throw new UnsupportedOperationException("This stream does not " +
"support setting the drop-behind caching setting.");
}
+ ((CanSetDropBehind) in).setDropBehind(dropCache);
}
@Override
@@ -737,11 +835,17 @@ public class CryptoInputStream extends FilterInputStream implements
@Override
public boolean hasCapability(String capability) {
switch (StringUtils.toLowerCase(capability)) {
+ case StreamCapabilities.UNBUFFER:
+ return true;
case StreamCapabilities.READAHEAD:
case StreamCapabilities.DROPBEHIND:
- case StreamCapabilities.UNBUFFER:
case StreamCapabilities.READBYTEBUFFER:
- return true;
+ case StreamCapabilities.PREADBYTEBUFFER:
+ if (!(in instanceof StreamCapabilities)) {
+ throw new UnsupportedOperationException("This stream does not expose " +
+ "its stream capabilities.");
+ }
+ return ((StreamCapabilities) in).hasCapability(capability);
default:
return false;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
new file mode 100644
index 0000000..d99ee16
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Implementers of this interface provide a positioned read API that writes to a
+ * {@link ByteBuffer} rather than a {@code byte[]}.
+ *
+ * @see PositionedReadable
+ * @see ByteBufferReadable
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ByteBufferPositionedReadable {
+ /**
+ * Reads up to {@code buf.remaining()} bytes into buf from a given position
+ * in the file and returns the number of bytes read. Callers should use
+ * {@code buf.limit(...)} to control the size of the desired read and
+ * {@code buf.position(...)} to control the offset into the buffer the data
+ * should be written to.
+ * <p>
+ * After a successful call, {@code buf.position()} will be advanced by the
+ * number of bytes read and {@code buf.limit()} will be unchanged.
+ * <p>
+ * In the case of an exception, the state of the buffer (the contents of the
+ * buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
+ * undefined, and callers should be prepared to recover from this
+ * eventuality.
+ * <p>
+ * Callers should use {@link StreamCapabilities#hasCapability(String)} with
+ * {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
+ * stream supports this interface, otherwise they might get a
+ * {@link UnsupportedOperationException}.
+ * <p>
+ * Implementations should treat 0-length requests as legitimate, and must not
+ * signal an error upon their receipt.
+ *
+ * @param position position within file
+ * @param buf the ByteBuffer to receive the results of the read operation.
+ * @return the number of bytes read, possibly zero, or -1 if reached
+ * end-of-stream
+ * @throws IOException if there is some error performing the read
+ */
+ int read(long position, ByteBuffer buf) throws IOException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java
index 926b554..5a4ae04 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.fs;
import java.io.IOException;
import java.nio.ByteBuffer;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -33,16 +34,18 @@ public interface ByteBufferReadable {
* Reads up to buf.remaining() bytes into buf. Callers should use
* buf.limit(..) to control the size of the desired read.
* <p>
- * After a successful call, buf.position() will be advanced by the number
- * of bytes read and buf.limit() should be unchanged.
+ * After a successful call, {@code buf.position()} will be advanced by the
+ * number of bytes read and {@code buf.limit()} will be unchanged.
* <p>
- * In the case of an exception, the values of buf.position() and buf.limit()
- * are undefined, and callers should be prepared to recover from this
+ * In the case of an exception, the state of the buffer (the contents of the
+ * buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
+ * undefined, and callers should be prepared to recover from this
* eventuality.
* <p>
- * Many implementations will throw {@link UnsupportedOperationException}, so
- * callers that are not confident in support for this method from the
- * underlying filesystem should be prepared to handle that exception.
+ * Callers should use {@link StreamCapabilities#hasCapability(String)} with
+ * {@link StreamCapabilities#READBYTEBUFFER} to check if the underlying
+ * stream supports this interface, otherwise they might get a
+ * {@link UnsupportedOperationException}.
* <p>
* Implementations should treat 0-length requests as legitimate, and must not
* signal an error upon their receipt.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index 62c45f1..066cc3d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -39,7 +39,8 @@ import org.apache.hadoop.util.IdentityHashStore;
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
- HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
+ HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
+ ByteBufferPositionedReadable {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
@@ -148,7 +149,8 @@ public class FSDataInputStream extends DataInputStream
return ((ByteBufferReadable)in).read(buf);
}
- throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
+ throw new UnsupportedOperationException("Byte-buffer read unsupported " +
+ "by input stream");
}
@Override
@@ -247,4 +249,13 @@ public class FSDataInputStream extends DataInputStream
public String toString() {
return super.toString() + ": " + in;
}
+
+ @Override
+ public int read(long position, ByteBuffer buf) throws IOException {
+ if (in instanceof ByteBufferPositionedReadable) {
+ return ((ByteBufferPositionedReadable) in).read(position, buf);
+ }
+ throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
+ "by input stream");
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index c52d307..e68e7b3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -66,6 +66,12 @@ public interface StreamCapabilities {
String READBYTEBUFFER = "in:readbytebuffer";
/**
+ * Stream read(long, ByteBuffer) capability implemented by
+ * {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
+ */
+ String PREADBYTEBUFFER = "in:preadbytebuffer";
+
+ /**
* Capabilities that a stream can support and be queried for.
*/
@Deprecated
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
index a0eb105..7463d6c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
@@ -26,6 +26,7 @@ import java.nio.ByteOrder;
import java.util.EnumSet;
import java.util.Random;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -129,6 +130,32 @@ public abstract class CryptoStreamsTestBase {
Assert.assertArrayEquals(result, expectedData);
}
+ private int byteBufferPreadAll(ByteBufferPositionedReadable in,
+ ByteBuffer buf) throws IOException {
+ int n = 0;
+ int total = 0;
+ while (n != -1) {
+ total += n;
+ if (!buf.hasRemaining()) {
+ break;
+ }
+ n = in.read(total, buf);
+ }
+
+ return total;
+ }
+
+ private void byteBufferPreadCheck(ByteBufferPositionedReadable in)
+ throws Exception {
+ ByteBuffer result = ByteBuffer.allocate(dataLen);
+ int n = byteBufferPreadAll(in, result);
+
+ Assert.assertEquals(dataLen, n);
+ ByteBuffer expectedData = ByteBuffer.allocate(n);
+ expectedData.put(data, 0, n);
+ Assert.assertArrayEquals(result.array(), expectedData.array());
+ }
+
protected OutputStream getOutputStream(int bufferSize) throws IOException {
return getOutputStream(bufferSize, key, iv);
}
@@ -288,20 +315,36 @@ public abstract class CryptoStreamsTestBase {
return total;
}
+
+ private int readAll(InputStream in, long pos, ByteBuffer buf)
+ throws IOException {
+ int n = 0;
+ int total = 0;
+ while (n != -1) {
+ total += n;
+ if (!buf.hasRemaining()) {
+ break;
+ }
+ n = ((ByteBufferPositionedReadable) in).read(pos + total, buf);
+ }
+
+ return total;
+ }
/** Test positioned read. */
@Test(timeout=120000)
public void testPositionedRead() throws Exception {
- OutputStream out = getOutputStream(defaultBufferSize);
- writeData(out);
+ try (OutputStream out = getOutputStream(defaultBufferSize)) {
+ writeData(out);
+ }
- InputStream in = getInputStream(defaultBufferSize);
- // Pos: 1/3 dataLen
- positionedReadCheck(in , dataLen / 3);
+ try (InputStream in = getInputStream(defaultBufferSize)) {
+ // Pos: 1/3 dataLen
+ positionedReadCheck(in, dataLen / 3);
- // Pos: 1/2 dataLen
- positionedReadCheck(in, dataLen / 2);
- in.close();
+ // Pos: 1/2 dataLen
+ positionedReadCheck(in, dataLen / 2);
+ }
}
private void positionedReadCheck(InputStream in, int pos) throws Exception {
@@ -315,6 +358,35 @@ public abstract class CryptoStreamsTestBase {
System.arraycopy(data, pos, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
}
+
+ /** Test positioned read with ByteBuffers. */
+ @Test(timeout=120000)
+ public void testPositionedReadWithByteBuffer() throws Exception {
+ try (OutputStream out = getOutputStream(defaultBufferSize)) {
+ writeData(out);
+ }
+
+ try (InputStream in = getInputStream(defaultBufferSize)) {
+ // Pos: 1/3 dataLen
+ positionedReadCheckWithByteBuffer(in, dataLen / 3);
+
+ // Pos: 1/2 dataLen
+ positionedReadCheckWithByteBuffer(in, dataLen / 2);
+ }
+ }
+
+ private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
+ throws Exception {
+ ByteBuffer result = ByteBuffer.allocate(dataLen);
+ int n = readAll(in, pos, result);
+
+ Assert.assertEquals(dataLen, n + pos);
+ byte[] readData = new byte[n];
+ System.arraycopy(result.array(), 0, readData, 0, n);
+ byte[] expectedData = new byte[n];
+ System.arraycopy(data, pos, expectedData, 0, n);
+ Assert.assertArrayEquals(readData, expectedData);
+ }
/** Test read fully */
@Test(timeout=120000)
@@ -505,12 +577,40 @@ public abstract class CryptoStreamsTestBase {
System.arraycopy(data, 0, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
}
+
+ private void byteBufferPreadCheck(InputStream in, ByteBuffer buf,
+ int bufPos) throws Exception {
+ // Test reading from position 0
+ buf.position(bufPos);
+ int n = ((ByteBufferPositionedReadable) in).read(0, buf);
+ Assert.assertEquals(bufPos + n, buf.position());
+ byte[] readData = new byte[n];
+ buf.rewind();
+ buf.position(bufPos);
+ buf.get(readData);
+ byte[] expectedData = new byte[n];
+ System.arraycopy(data, 0, expectedData, 0, n);
+ Assert.assertArrayEquals(readData, expectedData);
+
+ // Test reading from half way through the data
+ buf.position(bufPos);
+ n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf);
+ Assert.assertEquals(bufPos + n, buf.position());
+ readData = new byte[n];
+ buf.rewind();
+ buf.position(bufPos);
+ buf.get(readData);
+ expectedData = new byte[n];
+ System.arraycopy(data, dataLen / 2, expectedData, 0, n);
+ Assert.assertArrayEquals(readData, expectedData);
+ }
/** Test byte buffer read with different buffer size. */
@Test(timeout=120000)
public void testByteBufferRead() throws Exception {
- OutputStream out = getOutputStream(defaultBufferSize);
- writeData(out);
+ try (OutputStream out = getOutputStream(defaultBufferSize)) {
+ writeData(out);
+ }
// Default buffer size, initial buffer position is 0
InputStream in = getInputStream(defaultBufferSize);
@@ -560,6 +660,53 @@ public abstract class CryptoStreamsTestBase {
byteBufferReadCheck(in, buf, 11);
in.close();
}
+
+ /** Test byte buffer pread with different buffer size. */
+ @Test(timeout=120000)
+ public void testByteBufferPread() throws Exception {
+ try (OutputStream out = getOutputStream(defaultBufferSize)) {
+ writeData(out);
+ }
+
+ try (InputStream defaultBuf = getInputStream(defaultBufferSize);
+ InputStream smallBuf = getInputStream(smallBufferSize)) {
+
+ ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);
+
+ // Default buffer size, initial buffer position is 0
+ byteBufferPreadCheck(defaultBuf, buf, 0);
+
+ // Default buffer size, initial buffer position is not 0
+ buf.clear();
+ byteBufferPreadCheck(defaultBuf, buf, 11);
+
+ // Small buffer size, initial buffer position is 0
+ buf.clear();
+ byteBufferPreadCheck(smallBuf, buf, 0);
+
+ // Small buffer size, initial buffer position is not 0
+ buf.clear();
+ byteBufferPreadCheck(smallBuf, buf, 11);
+
+ // Test with direct ByteBuffer
+ buf = ByteBuffer.allocateDirect(dataLen + 100);
+
+ // Direct buffer, default buffer size, initial buffer position is 0
+ byteBufferPreadCheck(defaultBuf, buf, 0);
+
+ // Direct buffer, default buffer size, initial buffer position is not 0
+ buf.clear();
+ byteBufferPreadCheck(defaultBuf, buf, 11);
+
+ // Direct buffer, small buffer size, initial buffer position is 0
+ buf.clear();
+ byteBufferPreadCheck(smallBuf, buf, 0);
+
+ // Direct buffer, small buffer size, initial buffer position is not 0
+ buf.clear();
+ byteBufferPreadCheck(smallBuf, buf, 11);
+ }
+ }
@Test(timeout=120000)
public void testCombinedOp() throws Exception {
@@ -797,5 +944,23 @@ public abstract class CryptoStreamsTestBase {
// The close will be called when exiting this try-with-resource block
}
}
+
+ // Test ByteBuffer pread
+ try (InputStream in = getInputStream(smallBufferSize)) {
+ if (in instanceof ByteBufferPositionedReadable) {
+ ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in;
+
+ // Test unbuffer after pread
+ byteBufferPreadCheck(bbpin);
+ ((CanUnbuffer) in).unbuffer();
+
+ // Test pread again after unbuffer
+ byteBufferPreadCheck(bbpin);
+
+ // Test close after unbuffer
+ ((CanUnbuffer) in).unbuffer();
+ // The close will be called when exiting this try-with-resource block
+ }
+ }
}
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
index cd7391a..8bcf46e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
@@ -180,7 +181,7 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
implements Seekable, PositionedReadable, ByteBufferReadable,
HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer,
- StreamCapabilities {
+ StreamCapabilities, ByteBufferPositionedReadable {
private final byte[] oneByteBuf = new byte[1];
private int pos = 0;
private final byte[] data;
@@ -304,6 +305,32 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
}
@Override
+ public int read(long position, ByteBuffer buf) throws IOException {
+ if (buf == null) {
+ throw new NullPointerException();
+ } else if (!buf.hasRemaining()) {
+ return 0;
+ }
+
+ if (position > length) {
+ throw new IOException("Cannot read after EOF.");
+ }
+ if (position < 0) {
+ throw new IOException("Cannot read to negative offset.");
+ }
+
+ checkStream();
+
+ if (position < length) {
+ int n = (int) Math.min(buf.remaining(), length - position);
+ buf.put(data, (int) position, n);
+ return n;
+ }
+
+ return -1;
+ }
+
+ @Override
public void readFully(long position, byte[] b, int off, int len)
throws IOException {
if (b == null) {
@@ -378,6 +405,8 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
case StreamCapabilities.READAHEAD:
case StreamCapabilities.DROPBEHIND:
case StreamCapabilities.UNBUFFER:
+ case StreamCapabilities.READBYTEBUFFER:
+ case StreamCapabilities.PREADBYTEBUFFER:
return true;
default:
return false;
@@ -439,7 +468,9 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
new String[] {
StreamCapabilities.DROPBEHIND,
StreamCapabilities.READAHEAD,
- StreamCapabilities.UNBUFFER
+ StreamCapabilities.UNBUFFER,
+ StreamCapabilities.READBYTEBUFFER,
+ StreamCapabilities.PREADBYTEBUFFER
},
new String[] {
StreamCapabilities.HFLUSH,
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
index bb3fd7a..e7d922e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
@@ -90,11 +90,21 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase {
@Override
@Test(timeout=10000)
public void testByteBufferRead() throws Exception {}
+
+ @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+ @Override
+ @Test(timeout=10000)
+ public void testPositionedReadWithByteBuffer() throws IOException {}
@Ignore("ChecksumFSOutputSummer doesn't support Syncable")
@Override
@Test(timeout=10000)
public void testSyncable() throws IOException {}
+
+ @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+ @Override
+ @Test(timeout=10000)
+ public void testByteBufferPread() throws IOException {}
@Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read")
@Override
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
index 7e30077..036706f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
@@ -91,6 +91,11 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
@Test(timeout=10000)
public void testPositionedRead() throws IOException {}
+ @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+ @Override
+ @Test(timeout=10000)
+ public void testPositionedReadWithByteBuffer() throws IOException {}
+
@Ignore("Wrapped stream doesn't support ReadFully")
@Override
@Test(timeout=10000)
@@ -105,6 +110,11 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
@Override
@Test(timeout=10000)
public void testByteBufferRead() throws IOException {}
+
+ @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+ @Override
+ @Test(timeout=10000)
+ public void testByteBufferPread() throws IOException {}
@Ignore("Wrapped stream doesn't support ByteBufferRead, Seek")
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index a3e2ad5..2262295 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ByteBufferUtil;
import org.apache.hadoop.fs.CanSetDropBehind;
@@ -84,7 +85,6 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.IdentityHashStore;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.StringUtils;
-import org.apache.htrace.core.SpanId;
import com.google.common.annotations.VisibleForTesting;
@@ -99,7 +99,8 @@ import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
- HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
+ HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
+ ByteBufferPositionedReadable {
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
@@ -1561,6 +1562,14 @@ public class DFSInputStream extends FSInputStream
throw new IOException("Mark/reset not supported");
}
+ @Override
+ public int read(long position, final ByteBuffer buf) throws IOException {
+ if (!buf.hasRemaining()) {
+ return 0;
+ }
+ return pread(position, buf);
+ }
+
/** Utility class to encapsulate data node info and its address. */
static final class DNAddrPair {
final DatanodeInfo info;
@@ -1780,6 +1789,7 @@ public class DFSInputStream extends FSInputStream
case StreamCapabilities.DROPBEHIND:
case StreamCapabilities.UNBUFFER:
case StreamCapabilities.READBYTEBUFFER:
+ case StreamCapabilities.PREADBYTEBUFFER:
return true;
default:
return false;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h
index 0eab9a6..f003263 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h
@@ -50,6 +50,24 @@ extern "C" {
void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
/**
+ * Determine if a file is using the "direct pread" optimization.
+ *
+ * @param file The HDFS file
+ * @return 1 if the file is using the direct pread optimization,
+ * 0 otherwise.
+ */
+ int hdfsFileUsesDirectPread(struct hdfsFile_internal *file);
+
+ /**
+ * Disable the direct pread optimization for a file.
+ *
+ * This is mainly provided for unit testing purposes.
+ *
+ * @param file The HDFS file
+ */
+ void hdfsFileDisableDirectPread(struct hdfsFile_internal *file);
+
+ /**
* Disable domain socket security checks.
*
* @param 0 if domain socket security was disabled;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
index 1cd497b..ebf0dd7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
@@ -88,9 +88,9 @@ int main(int argc, char **argv) {
const char *userPath = "/tmp/usertestfile.txt";
char buffer[32], buffer2[256], rdbuffer[32];
- tSize num_written_bytes, num_read_bytes;
+ tSize num_written_bytes, num_read_bytes, num_pread_bytes;
hdfsFS fs, lfs;
- hdfsFile writeFile, readFile, localFile, appendFile, userFile;
+ hdfsFile writeFile, readFile, preadFile, localFile, appendFile, userFile;
tOffset currentPos, seekPos;
int exists, totalResult, result, numEntries, i, j;
const char *resp;
@@ -145,7 +145,7 @@ int main(int argc, char **argv) {
}
{
- //Write tests
+ // Write tests
writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
if(!writeFile) {
@@ -188,7 +188,7 @@ int main(int argc, char **argv) {
}
{
- //Read tests
+ // Read tests
exists = hdfsExists(fs, readPath);
@@ -250,6 +250,7 @@ int main(int argc, char **argv) {
}
fprintf(stderr, "Read (direct) following %d bytes:\n%s\n",
num_read_bytes, buffer);
+ memset(buffer, 0, strlen(fileContents + 1));
if (hdfsSeek(fs, readFile, 0L)) {
fprintf(stderr, "Failed to seek to file start!\n");
shutdown_and_exit(cl, -1);
@@ -259,17 +260,27 @@ int main(int argc, char **argv) {
// read path
hdfsFileDisableDirectRead(readFile);
- num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
- sizeof(buffer));
- fprintf(stderr, "Read following %d bytes:\n%s\n",
- num_read_bytes, buffer);
+ if (hdfsFileUsesDirectRead(readFile)) {
+ fprintf(stderr, "Disabled direct reads, but it is still enabled");
+ shutdown_and_exit(cl, -1);
+ }
- memset(buffer, 0, strlen(fileContents + 1));
+ if (!hdfsFileUsesDirectPread(readFile)) {
+ fprintf(stderr, "Disabled direct reads, but direct preads was "
+ "disabled as well");
+ shutdown_and_exit(cl, -1);
+ }
- num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer,
+ num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
sizeof(buffer));
- fprintf(stderr, "Read following %d bytes:\n%s\n",
+ if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+ fprintf(stderr, "Failed to read. Expected %s but got %s (%d bytes)\n",
+ fileContents, buffer, num_read_bytes);
+ shutdown_and_exit(cl, -1);
+ }
+ fprintf(stderr, "Read following %d bytes:\n%s\n",
num_read_bytes, buffer);
+ memset(buffer, 0, strlen(fileContents + 1));
hdfsCloseFile(fs, readFile);
@@ -295,6 +306,115 @@ int main(int argc, char **argv) {
hdfsCloseFile(lfs, localFile);
}
+ {
+ // Pread tests
+
+ exists = hdfsExists(fs, readPath);
+
+ if (exists) {
+ fprintf(stderr, "Failed to validate existence of %s\n", readPath);
+ shutdown_and_exit(cl, -1);
+ }
+
+ preadFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0);
+ if (!preadFile) {
+ fprintf(stderr, "Failed to open %s for reading!\n", readPath);
+ shutdown_and_exit(cl, -1);
+ }
+
+ if (!hdfsFileIsOpenForRead(preadFile)) {
+ fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file "
+ "with O_RDONLY, and it did not show up as 'open for "
+ "read'\n");
+ shutdown_and_exit(cl, -1);
+ }
+
+ fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, preadFile));
+
+ num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer));
+ if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+ fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
+ fileContents, buffer, num_read_bytes);
+ shutdown_and_exit(cl, -1);
+ }
+ fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n",
+ num_pread_bytes, buffer);
+ memset(buffer, 0, strlen(fileContents + 1));
+ if (hdfsTell(fs, preadFile) != 0) {
+ fprintf(stderr, "Pread changed position of file\n");
+ shutdown_and_exit(cl, -1);
+ }
+
+ // Test pread midway through the file rather than at the beginning
+ const char *fileContentsChunk = "World!";
+ num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer));
+ if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) {
+ fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
+ fileContentsChunk, buffer, num_read_bytes);
+ shutdown_and_exit(cl, -1);
+ }
+ fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer);
+ memset(buffer, 0, strlen(fileContents + 1));
+ if (hdfsTell(fs, preadFile) != 0) {
+ fprintf(stderr, "Pread changed position of file\n");
+ shutdown_and_exit(cl, -1);
+ }
+
+ // Disable the direct pread path so that we really go through the slow
+ // read path
+ hdfsFileDisableDirectPread(preadFile);
+
+ if (hdfsFileUsesDirectPread(preadFile)) {
+ fprintf(stderr, "Disabled direct preads, but it is still enabled");
+ shutdown_and_exit(cl, -1);
+ }
+
+ if (!hdfsFileUsesDirectRead(preadFile)) {
+ fprintf(stderr, "Disabled direct preads, but direct read was "
+ "disabled as well");
+ shutdown_and_exit(cl, -1);
+ }
+
+ num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer));
+ if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+ fprintf(stderr, "Failed to pread. Expected %s but got %s (%d bytes)\n",
+ fileContents, buffer, num_pread_bytes);
+ shutdown_and_exit(cl, -1);
+ }
+ fprintf(stderr, "Pread following %d bytes:\n%s\n", num_pread_bytes, buffer);
+ memset(buffer, 0, strlen(fileContents + 1));
+ if (hdfsTell(fs, preadFile) != 0) {
+ fprintf(stderr, "Pread changed position of file\n");
+ shutdown_and_exit(cl, -1);
+ }
+
+ num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer));
+ if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) {
+ fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
+ fileContentsChunk, buffer, num_read_bytes);
+ shutdown_and_exit(cl, -1);
+ }
+ fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer);
+ memset(buffer, 0, strlen(fileContents + 1));
+ if (hdfsTell(fs, preadFile) != 0) {
+ fprintf(stderr, "Pread changed position of file\n");
+ shutdown_and_exit(cl, -1);
+ }
+
+ hdfsCloseFile(fs, preadFile);
+
+ // Test correct behaviour for unsupported filesystems
+ localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0);
+
+ if (hdfsFileUsesDirectPread(localFile)) {
+ fprintf(stderr, "Direct pread support incorrectly detected for local "
+ "filesystem\n");
+ shutdown_and_exit(cl, -1);
+ }
+
+ hdfsCloseFile(lfs, localFile);
+ }
+
totalResult = 0;
result = 0;
{
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
index e212f21..2fd0592 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
@@ -40,8 +40,23 @@
// Bit fields for hdfsFile_internal flags
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
+#define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<1)
+/**
+ * Reads bytes using the read(ByteBuffer) API. By using Java
+ * DirectByteBuffers we can avoid copying the bytes onto the Java heap.
+ * Instead the data will be directly copied from kernel space to the C heap.
+ */
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
+
+/**
+ * Reads bytes using the read(long, ByteBuffer) API. By using Java
+ * DirectByteBuffers we can avoid copying the bytes onto the Java heap.
+ * Instead the data will be directly copied from kernel space to the C heap.
+ */
+tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
+ tSize length);
+
static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
/**
@@ -285,7 +300,7 @@ int hdfsFileIsOpenForWrite(hdfsFile file)
int hdfsFileUsesDirectRead(hdfsFile file)
{
- return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ);
+ return (file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) != 0;
}
void hdfsFileDisableDirectRead(hdfsFile file)
@@ -293,6 +308,17 @@ void hdfsFileDisableDirectRead(hdfsFile file)
file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
}
+int hdfsFileUsesDirectPread(hdfsFile file)
+{
+ return (file->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) != 0;
+}
+
+void hdfsFileDisableDirectPread(hdfsFile file)
+{
+ file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_PREAD;
+}
+
+
int hdfsDisableDomainSocketSecurity(void)
{
jthrowable jthr;
@@ -985,6 +1011,62 @@ int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld,
return 0;
}
+/**
+ * Delegates to FsDataInputStream#hasCapability(String). Used to check if a
+ * given input stream supports certain methods, such as
+ * ByteBufferReadable#read(ByteBuffer).
+ *
+ * @param jFile the FsDataInputStream to call hasCapability on
+ * @param capability the name of the capability to query; for a full list of
+ * possible values see StreamCapabilities
+ *
+ * @return true if the given jFile has the given capability, false otherwise
+ *
+ * @see org.apache.hadoop.fs.StreamCapabilities
+ */
+static int hdfsHasStreamCapability(jobject jFile,
+ const char *capability) {
+ int ret = 0;
+ jthrowable jthr = NULL;
+ jvalue jVal;
+ jstring jCapabilityString = NULL;
+
+ /* Get the JNIEnv* corresponding to current thread */
+ JNIEnv* env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return 0;
+ }
+
+ jthr = newJavaStr(env, capability, &jCapabilityString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsHasStreamCapability(%s): newJavaStr", capability);
+ goto done;
+ }
+ jthr = invokeMethod(env, &jVal, INSTANCE, jFile,
+ JC_FS_DATA_INPUT_STREAM, "hasCapability", "(Ljava/lang/String;)Z",
+ jCapabilityString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsHasStreamCapability(%s): FSDataInputStream#hasCapability",
+ capability);
+ goto done;
+ }
+
+done:
+ destroyLocalReference(env, jthr);
+ destroyLocalReference(env, jCapabilityString);
+ if (ret) {
+ errno = ret;
+ return 0;
+ }
+ if (jVal.z == JNI_TRUE) {
+ return 1;
+ }
+ return 0;
+}
+
static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
int32_t bufferSize, int16_t replication, int64_t blockSize)
{
@@ -995,7 +1077,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
return f{is|os};
*/
int accmode = flags & O_ACCMODE;
- jstring jStrBufferSize = NULL, jStrReplication = NULL, jCapabilityString = NULL;
+ jstring jStrBufferSize = NULL, jStrReplication = NULL;
jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
jobject jFS = (jobject)fs;
jthrowable jthr;
@@ -1024,13 +1106,15 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
errno = ENOTSUP;
return NULL;
} else {
- fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n", accmode);
+ fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n",
+ accmode);
errno = EINVAL;
return NULL;
}
if ((flags & O_CREAT) && (flags & O_EXCL)) {
- fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
+ fprintf(stderr,
+ "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
}
if (accmode == O_RDONLY) {
@@ -1153,34 +1237,26 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
file->flags = 0;
if ((flags & O_WRONLY) == 0) {
- // Check the StreamCapabilities of jFile to see if we can do direct reads
- jthr = newJavaStr(env, "in:readbytebuffer", &jCapabilityString);
- if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hdfsOpenFile(%s): newJavaStr", path);
- goto done;
- }
- jthr = invokeMethod(env, &jVal, INSTANCE, jFile,
- JC_FS_DATA_INPUT_STREAM, "hasCapability",
- "(Ljava/lang/String;)Z", jCapabilityString);
- if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hdfsOpenFile(%s): FSDataInputStream#hasCapability", path);
- goto done;
- }
- if (jVal.z) {
+ // Check the StreamCapabilities of jFile to see if we can do direct
+ // reads
+ if (hdfsHasStreamCapability(jFile, "in:readbytebuffer")) {
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
}
+
+ // Check the StreamCapabilities of jFile to see if we can do direct
+ // preads
+ if (hdfsHasStreamCapability(jFile, "in:preadbytebuffer")) {
+ file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
+ }
}
ret = 0;
done:
destroyLocalReference(env, jStrBufferSize);
destroyLocalReference(env, jStrReplication);
- destroyLocalReference(env, jConfiguration);
- destroyLocalReference(env, jPath);
+ destroyLocalReference(env, jConfiguration);
+ destroyLocalReference(env, jPath);
destroyLocalReference(env, jFile);
- destroyLocalReference(env, jCapabilityString);
if (ret) {
if (file) {
if (file->file) {
@@ -1385,6 +1461,13 @@ static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f,
return 0;
}
+/**
+ * If the underlying stream supports the ByteBufferReadable interface then
+ * this method will transparently use read(ByteBuffer). This can help
+ * improve performance as it avoids unnecessarily copying data on to the Java
+ * heap. Instead the data will be directly copied from kernel space to the C
+ * heap.
+ */
tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
{
jobject jInputStream;
@@ -1459,12 +1542,11 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
return jVal.i;
}
-// Reads using the read(ByteBuffer) API, which does fewer copies
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
{
// JAVA EQUIVALENT:
- // ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
- // fis.read(bbuffer);
+ // ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
+ // fis.read(buf);
jobject jInputStream;
jvalue jVal;
@@ -1499,9 +1581,25 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
"readDirect: FSDataInputStream#read");
return -1;
}
- return (jVal.i < 0) ? 0 : jVal.i;
+ // Reached EOF, return 0
+ if (jVal.i < 0) {
+ return 0;
+ }
+ // 0 bytes read, return error
+ if (jVal.i == 0) {
+ errno = EINTR;
+ return -1;
+ }
+ return jVal.i;
}
+/**
+ * If the underlying stream supports the ByteBufferPositionedReadable
+ * interface then this method will transparently use read(long, ByteBuffer).
+ * This can help improve performance as it avoids unnecessarily copying data
+ * on to the Java heap. Instead the data will be directly copied from kernel
+ * space to the C heap.
+ */
tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
void* buffer, tSize length)
{
@@ -1521,6 +1619,10 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
return -1;
}
+ if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) {
+ return preadDirect(fs, f, position, buffer, length);
+ }
+
env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
@@ -1571,6 +1673,60 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
return jVal.i;
}
+tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
+ tSize length)
+{
+ // JAVA EQUIVALENT:
+ // ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
+ // fis.read(position, buf);
+
+ jvalue jVal;
+ jthrowable jthr;
+ jobject bb;
+
+ //Get the JNIEnv* corresponding to current thread
+ JNIEnv* env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+
+ //Error checking... make sure that this file is 'readable'
+ if (f->type != HDFS_STREAM_INPUT) {
+ fprintf(stderr, "Cannot read from a non-InputStream object!\n");
+ errno = EINVAL;
+ return -1;
+ }
+
+ //Read the requisite bytes
+ bb = (*env)->NewDirectByteBuffer(env, buffer, length);
+ if (bb == NULL) {
+ errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+ "readDirect: NewDirectByteBuffer");
+ return -1;
+ }
+
+ jthr = invokeMethod(env, &jVal, INSTANCE, f->file,
+ JC_FS_DATA_INPUT_STREAM, "read", "(JLjava/nio/ByteBuffer;)I",
+ position, bb);
+ destroyLocalReference(env, bb);
+ if (jthr) {
+ errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "preadDirect: FSDataInputStream#read");
+ return -1;
+ }
+ // Reached EOF, return 0
+ if (jVal.i < 0) {
+ return 0;
+ }
+ // 0 bytes read, return error
+ if (jVal.i == 0) {
+ errno = EINTR;
+ return -1;
+ }
+ return jVal.i;
+}
+
tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
{
// JAVA EQUIVALENT
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
new file mode 100644
index 0000000..4547db1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests the DFS positional read functionality on a single node
+ * mini-cluster. These tests are inspired from {@link TestPread}. The tests
+ * are much less comprehensive than other pread tests because pread already
+ * internally uses {@link ByteBuffer}s.
+ */
+public class TestByteBufferPread {
+
+ private static MiniDFSCluster cluster;
+ private static FileSystem fs;
+ private static byte[] fileContents;
+ private static Path testFile;
+ private static Random rand;
+
+ private static final long SEED = 0xDEADBEEFL;
+ private static final int BLOCK_SIZE = 4096;
+ private static final int FILE_SIZE = 12 * BLOCK_SIZE;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ // Setup the cluster with a small block size so we can create small files
+ // that span multiple blocks
+ Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ fs = cluster.getFileSystem();
+
+ // Create a test file that spans 12 blocks, and contains a bunch of random
+ // bytes
+ fileContents = new byte[FILE_SIZE];
+ rand = new Random(SEED);
+ rand.nextBytes(fileContents);
+ testFile = new Path("/byte-buffer-pread-test.dat");
+ try (FSDataOutputStream out = fs.create(testFile, (short) 3)) {
+ out.write(fileContents);
+ }
+ }
+
+ /**
+ * Test preads with {@link java.nio.HeapByteBuffer}s.
+ */
+ @Test
+ public void testPreadWithHeapByteBuffer() throws IOException {
+ testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+ testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+ testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+ testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+ testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+ }
+
+ /**
+ * Test preads with {@link java.nio.DirectByteBuffer}s.
+ */
+ @Test
+ public void testPreadWithDirectByteBuffer() throws IOException {
+ testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+ testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+ testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+ testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+ testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+ }
+
+ /**
+ * Reads the entire testFile using the pread API and validates that its
+ * contents are properly loaded into the supplied {@link ByteBuffer}.
+ */
+ private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException {
+ int bytesRead;
+ int totalBytesRead = 0;
+ try (FSDataInputStream in = fs.open(testFile)) {
+ while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
+ totalBytesRead += bytesRead;
+ // Check that each call to read changes the position of the ByteBuffer
+ // correctly
+ assertEquals(totalBytesRead, buffer.position());
+ }
+
+ // Make sure the buffer is full
+ assertFalse(buffer.hasRemaining());
+ // Make sure the contents of the read buffer equal the contents of the
+ // file
+ buffer.position(0);
+ byte[] bufferContents = new byte[FILE_SIZE];
+ buffer.get(bufferContents);
+ assertArrayEquals(bufferContents, fileContents);
+ buffer.position(buffer.limit());
+ }
+ }
+
+ /**
+ * Attempts to read the testFile into a {@link ByteBuffer} that is already
+ * full, and validates that doing so does not change the contents of the
+ * supplied {@link ByteBuffer}.
+ */
+ private void testPreadWithFullByteBuffer(ByteBuffer buffer)
+ throws IOException {
+ // Load some dummy data into the buffer
+ byte[] existingBufferBytes = new byte[FILE_SIZE];
+ rand.nextBytes(existingBufferBytes);
+ buffer.put(existingBufferBytes);
+ // Make sure the buffer is full
+ assertFalse(buffer.hasRemaining());
+
+ try (FSDataInputStream in = fs.open(testFile)) {
+ // Attempt to read into the buffer, 0 bytes should be read since the
+ // buffer is full
+ assertEquals(0, in.read(buffer));
+
+ // Double check the buffer is still full and its contents have not
+ // changed
+ assertFalse(buffer.hasRemaining());
+ buffer.position(0);
+ byte[] bufferContents = new byte[FILE_SIZE];
+ buffer.get(bufferContents);
+ assertArrayEquals(bufferContents, existingBufferBytes);
+ }
+ }
+
+ /**
+ * Reads half of the testFile into the {@link ByteBuffer} by setting a
+ * {@link ByteBuffer#limit} on the buffer. Validates that only half of the
+ * testFile is loaded into the buffer.
+ */
+ private void testPreadWithLimitedByteBuffer(
+ ByteBuffer buffer) throws IOException {
+ int bytesRead;
+ int totalBytesRead = 0;
+ // Set the buffer limit to half the size of the file
+ buffer.limit(FILE_SIZE / 2);
+
+ try (FSDataInputStream in = fs.open(testFile)) {
+ while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
+ totalBytesRead += bytesRead;
+ // Check that each call to read changes the position of the ByteBuffer
+ // correctly
+ assertEquals(totalBytesRead, buffer.position());
+ }
+
+ // Since we set the buffer limit to half the size of the file, we should
+ // have only read half of the file into the buffer
+ assertEquals(totalBytesRead, FILE_SIZE / 2);
+ // Check that the buffer is full and the contents equal the first half of
+ // the file
+ assertFalse(buffer.hasRemaining());
+ buffer.position(0);
+ byte[] bufferContents = new byte[FILE_SIZE / 2];
+ buffer.get(bufferContents);
+ assertArrayEquals(bufferContents,
+ Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
+ }
+ }
+
+ /**
+ * Reads half of the testFile into the {@link ByteBuffer} by setting the
+ * {@link ByteBuffer#position} the half the size of the file. Validates that
+ * only half of the testFile is loaded into the buffer.
+ */
+ private void testPreadWithPositionedByteBuffer(
+ ByteBuffer buffer) throws IOException {
+ int bytesRead;
+ int totalBytesRead = 0;
+ // Set the buffer position to half the size of the file
+ buffer.position(FILE_SIZE / 2);
+
+ try (FSDataInputStream in = fs.open(testFile)) {
+ while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
+ totalBytesRead += bytesRead;
+ // Check that each call to read changes the position of the ByteBuffer
+ // correctly
+ assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position());
+ }
+
+ // Since we set the buffer position to half the size of the file, we
+ // should have only read half of the file into the buffer
+ assertEquals(totalBytesRead, FILE_SIZE / 2);
+ // Check that the buffer is full and the contents equal the first half of
+ // the file
+ assertFalse(buffer.hasRemaining());
+ buffer.position(FILE_SIZE / 2);
+ byte[] bufferContents = new byte[FILE_SIZE / 2];
+ buffer.get(bufferContents);
+ assertArrayEquals(bufferContents,
+ Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
+ }
+ }
+
+ /**
+ * Reads half of the testFile into the {@link ByteBuffer} by specifying a
+ * position for the pread API that is half of the file size. Validates that
+ * only half of the testFile is loaded into the buffer.
+ */
+ private void testPositionedPreadWithByteBuffer(
+ ByteBuffer buffer) throws IOException {
+ int bytesRead;
+ int totalBytesRead = 0;
+
+ try (FSDataInputStream in = fs.open(testFile)) {
+ // Start reading from halfway through the file
+ while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2,
+ buffer)) > 0) {
+ totalBytesRead += bytesRead;
+ // Check that each call to read changes the position of the ByteBuffer
+ // correctly
+ assertEquals(totalBytesRead, buffer.position());
+ }
+
+ // Since we starting reading halfway through the file, the buffer should
+ // only be half full
+ assertEquals(totalBytesRead, FILE_SIZE / 2);
+ assertEquals(buffer.position(), FILE_SIZE / 2);
+ assertTrue(buffer.hasRemaining());
+ // Check that the buffer contents equal the second half of the file
+ buffer.position(0);
+ byte[] bufferContents = new byte[FILE_SIZE / 2];
+ buffer.get(bufferContents);
+ assertArrayEquals(bufferContents,
+ Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE));
+ }
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ try {
+ fs.delete(testFile, false);
+ fs.close();
+ } finally {
+ cluster.shutdown(true);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org