You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/06/18 12:32:06 UTC
[hbase] 04/22: HBASE-22016 Rewrite the block reading methods by
using hbase.nio.ByteBuff
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 7698baea1f4a8038693d4676091c6872b26b5d2e
Author: huzheng <op...@gmail.com>
AuthorDate: Fri Mar 8 16:46:06 2019 +0800
HBASE-22016 Rewrite the block reading methods by using hbase.nio.ByteBuff
---
.../apache/hadoop/hbase/io/hfile/BlockIOUtils.java | 223 ++++++++++++++
.../apache/hadoop/hbase/io/hfile/HFileBlock.java | 337 ++++++++-------------
...ckPositionalRead.java => TestBlockIOUtils.java} | 122 ++++++--
.../apache/hadoop/hbase/io/hfile/TestChecksum.java | 14 +-
4 files changed, 453 insertions(+), 243 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java
new file mode 100644
index 0000000..dbd5b2e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+class BlockIOUtils {
+
+ static boolean isByteBufferReadable(FSDataInputStream is) {
+ InputStream cur = is.getWrappedStream();
+ for (;;) {
+ if ((cur instanceof FSDataInputStream)) {
+ cur = ((FSDataInputStream) cur).getWrappedStream();
+ } else {
+ break;
+ }
+ }
+ return cur instanceof ByteBufferReadable;
+ }
+
+ /**
+ * Read length bytes into ByteBuffers directly.
+ * @param buf the destination {@link ByteBuff}
+ * @param dis the HDFS input stream which implement the ByteBufferReadable interface.
+ * @param length bytes to read.
+ * @throws IOException exception to throw if any error happen
+ */
+ static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
+ if (!isByteBufferReadable(dis)) {
+ // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
+ // the destination ByteBuff.
+ byte[] heapBuf = new byte[length];
+ IOUtils.readFully(dis, heapBuf, 0, length);
+ copyToByteBuff(heapBuf, 0, length, buf);
+ return;
+ }
+ ByteBuffer[] buffers = buf.nioByteBuffers();
+ int remain = length;
+ int idx = 0;
+ ByteBuffer cur = buffers[idx];
+ while (remain > 0) {
+ while (!cur.hasRemaining()) {
+ if (++idx >= buffers.length) {
+ throw new IOException(
+ "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes");
+ }
+ cur = buffers[idx];
+ }
+ cur.limit(cur.position() + Math.min(remain, cur.remaining()));
+ int bytesRead = dis.read(cur);
+ if (bytesRead < 0) {
+ throw new IOException(
+ "Premature EOF from inputStream, but still need " + remain + " " + "bytes");
+ }
+ remain -= bytesRead;
+ }
+ }
+
+ /**
+ * Read from an input stream at least <code>necessaryLen</code> and if possible,
+ * <code>extraLen</code> also if available. Analogous to
+ * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra"
+ * bytes to also optionally read.
+ * @param in the input stream to read from
+ * @param buf the buffer to read into
+ * @param bufOffset the destination offset in the buffer
+ * @param necessaryLen the number of bytes that are absolutely necessary to read
+ * @param extraLen the number of extra bytes that would be nice to read
+ * @return true if succeeded reading the extra bytes
+ * @throws IOException if failed to read the necessary bytes
+ */
+ private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset,
+ int necessaryLen, int extraLen) throws IOException {
+ int bytesRemaining = necessaryLen + extraLen;
+ while (bytesRemaining > 0) {
+ int ret = in.read(buf, bufOffset, bytesRemaining);
+ if (ret < 0) {
+ if (bytesRemaining <= extraLen) {
+ // We could not read the "extra data", but that is OK.
+ break;
+ }
+ throw new IOException("Premature EOF from inputStream (read " + "returned " + ret
+ + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+ + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining));
+ }
+ bufOffset += ret;
+ bytesRemaining -= ret;
+ }
+ return bytesRemaining <= 0;
+ }
+
+ /**
+ * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only
+ * contains necessaryLen bytes, which depends on how much bytes do the last time we read.
+ * @param buf the destination {@link ByteBuff}.
+ * @param dis input stream to read.
+ * @param necessaryLen bytes which we must read
+ * @param extraLen bytes which we may read
+ * @return if the returned flag is true, then we've finished to read the extraLen into our
+ * ByteBuffers, otherwise we've not read the extraLen bytes yet.
+ * @throws IOException if failed to read the necessary bytes.
+ */
+ static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, int extraLen)
+ throws IOException {
+ if (!isByteBufferReadable(dis)) {
+ // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
+ // the destination ByteBuff.
+ byte[] heapBuf = new byte[necessaryLen + extraLen];
+ boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen);
+ copyToByteBuff(heapBuf, 0, heapBuf.length, buf);
+ return ret;
+ }
+ ByteBuffer[] buffers = buf.nioByteBuffers();
+ int bytesRead = 0;
+ int remain = necessaryLen + extraLen;
+ int idx = 0;
+ ByteBuffer cur = buffers[idx];
+ while (bytesRead < necessaryLen) {
+ while (!cur.hasRemaining()) {
+ if (++idx >= buffers.length) {
+ throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
+ }
+ cur = buffers[idx];
+ }
+ cur.limit(cur.position() + Math.min(remain, cur.remaining()));
+ int ret = dis.read(cur);
+ if (ret < 0) {
+ throw new IOException("Premature EOF from inputStream (read returned " + ret
+ + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+ + " extra bytes, successfully read " + bytesRead);
+ }
+ bytesRead += ret;
+ remain -= ret;
+ }
+ return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
+ }
+
+ /**
+ * Read from an input stream at least <code>necessaryLen</code> and if possible,
+ * <code>extraLen</code> also if available. Analogous to
+ * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
+ * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
+ * read.
+ * @param buff ByteBuff to read into.
+ * @param dis the input stream to read from
+ * @param position the position within the stream from which to start reading
+ * @param necessaryLen the number of bytes that are absolutely necessary to read
+ * @param extraLen the number of extra bytes that would be nice to read
+ * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
+ * @throws IOException if failed to read the necessary bytes
+ */
+ static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
+ int necessaryLen, int extraLen) throws IOException {
+ int remain = necessaryLen + extraLen;
+ byte[] buf = new byte[remain];
+ int bytesRead = 0;
+ while (bytesRead < necessaryLen) {
+ int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
+ if (ret < 0) {
+ throw new IOException("Premature EOF from inputStream (positional read returned " + ret
+ + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+ + " extra bytes, successfully read " + bytesRead);
+ }
+ bytesRead += ret;
+ remain -= ret;
+ }
+ // Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we
+ // will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[].
+ // TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[]
+ // preadWithExtra method for the upper layer, only need to refactor this method if the
+ // ByteBuffer pread is OK.
+ copyToByteBuff(buf, 0, bytesRead, buff);
+ return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
+ }
+
+ private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
+ throws IOException {
+ if (offset < 0 || len < 0 || offset + len > buf.length) {
+ throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length);
+ }
+ ByteBuffer[] buffers = out.nioByteBuffers();
+ int idx = 0, remain = len, copyLen;
+ ByteBuffer cur = buffers[idx];
+ while (remain > 0) {
+ while (!cur.hasRemaining()) {
+ if (++idx >= buffers.length) {
+ throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
+ }
+ cur = buffers[idx];
+ }
+ copyLen = Math.min(cur.remaining(), remain);
+ cur.put(buf, offset, copyLen);
+ remain -= copyLen;
+ offset += copyLen;
+ }
+ return len;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 91e63fd..4773678 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -21,7 +21,6 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@@ -280,9 +278,7 @@ public class HFileBlock implements Cacheable {
boolean usesChecksum = buf.get() == (byte) 1;
long offset = buf.getLong();
int nextBlockOnDiskSize = buf.getInt();
- HFileBlock hFileBlock =
- new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
- return hFileBlock;
+ return new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
}
@Override
@@ -315,9 +311,9 @@ public class HFileBlock implements Cacheable {
* param.
*/
private HFileBlock(HFileBlock that, boolean bufCopy) {
- init(that.blockType, that.onDiskSizeWithoutHeader,
- that.uncompressedSizeWithoutHeader, that.prevBlockOffset,
- that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize, that.fileContext);
+ init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader,
+ that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
+ that.fileContext);
if (bufCopy) {
this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
} else {
@@ -331,6 +327,7 @@ public class HFileBlock implements Cacheable {
* and is sitting in a byte buffer and we want to stuff the block into cache.
*
* <p>TODO: The caller presumes no checksumming
+ * <p>TODO: HFile block writer can also off-heap ? </p>
* required of this block instance since going into cache; checksum already verified on
* underlying block data pulled in from filesystem. Is that correct? What if cache is SSD?
*
@@ -349,8 +346,8 @@ public class HFileBlock implements Cacheable {
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
HFileContext fileContext) {
- init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
- prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
+ init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
+ onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
this.buf = new SingleByteBuff(b);
if (fillHeader) {
overwriteHeader();
@@ -366,7 +363,8 @@ public class HFileBlock implements Cacheable {
* @param buf Has header, content, and trailing checksums if present.
*/
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset,
- final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException {
+ final int nextBlockOnDiskSize, HFileContext fileContext)
+ throws IOException {
buf.rewind();
final BlockType blockType = BlockType.read(buf);
final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
@@ -394,8 +392,8 @@ public class HFileBlock implements Cacheable {
}
fileContext = fileContextBuilder.build();
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
- init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
- prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
+ init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
+ onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
this.memType = memType;
this.offset = offset;
this.buf = buf;
@@ -406,9 +404,8 @@ public class HFileBlock implements Cacheable {
* Called from constructors.
*/
private void init(BlockType blockType, int onDiskSizeWithoutHeader,
- int uncompressedSizeWithoutHeader, long prevBlockOffset,
- long offset, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize,
- HFileContext fileContext) {
+ int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset,
+ int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext) {
this.blockType = blockType;
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
@@ -425,10 +422,9 @@ public class HFileBlock implements Cacheable {
* @param verifyChecksum true if checksum verification is in use.
* @return Size of the block with header included.
*/
- private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf,
+ private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf,
boolean verifyChecksum) {
- return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) +
- headerSize(verifyChecksum);
+ return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum);
}
/**
@@ -651,9 +647,10 @@ public class HFileBlock implements Cacheable {
ByteBuff dup = this.buf.duplicate();
dup.position(this.headerSize());
dup = dup.slice();
+
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
- unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
- dup);
+ unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
+
return unpacked;
}
@@ -667,15 +664,14 @@ public class HFileBlock implements Cacheable {
int headerSize = headerSize();
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
- // TODO we need consider allocating offheap here?
- ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
+ ByteBuff newBuf = new SingleByteBuff(ByteBuffer.allocate(capacityNeeded));
// Copy header bytes into newBuf.
// newBuf is HBB so no issue in calling array()
buf.position(0);
- buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
+ newBuf.put(0, buf, 0, headerSize);
- buf = new SingleByteBuff(newBuf);
+ buf = newBuf;
// set limit to exclude next block's header
buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
}
@@ -692,17 +688,6 @@ public class HFileBlock implements Cacheable {
return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
}
- /** An additional sanity-check in case no compression or encryption is being used. */
- @VisibleForTesting
- void sanityCheckUncompressedSize() throws IOException {
- if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) {
- throw new IOException("Using no compression but "
- + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
- + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
- + ", numChecksumbytes=" + totalChecksumBytes());
- }
- }
-
/**
* Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when
* block is returned to the cache.
@@ -748,82 +733,6 @@ public class HFileBlock implements Cacheable {
}
/**
- * Read from an input stream at least <code>necessaryLen</code> and if possible,
- * <code>extraLen</code> also if available. Analogous to
- * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
- * number of "extra" bytes to also optionally read.
- *
- * @param in the input stream to read from
- * @param buf the buffer to read into
- * @param bufOffset the destination offset in the buffer
- * @param necessaryLen the number of bytes that are absolutely necessary to read
- * @param extraLen the number of extra bytes that would be nice to read
- * @return true if succeeded reading the extra bytes
- * @throws IOException if failed to read the necessary bytes
- */
- static boolean readWithExtra(InputStream in, byte[] buf,
- int bufOffset, int necessaryLen, int extraLen) throws IOException {
- int bytesRemaining = necessaryLen + extraLen;
- while (bytesRemaining > 0) {
- int ret = in.read(buf, bufOffset, bytesRemaining);
- if (ret == -1 && bytesRemaining <= extraLen) {
- // We could not read the "extra data", but that is OK.
- break;
- }
- if (ret < 0) {
- throw new IOException("Premature EOF from inputStream (read "
- + "returned " + ret + ", was trying to read " + necessaryLen
- + " necessary bytes and " + extraLen + " extra bytes, "
- + "successfully read "
- + (necessaryLen + extraLen - bytesRemaining));
- }
- bufOffset += ret;
- bytesRemaining -= ret;
- }
- return bytesRemaining <= 0;
- }
-
- /**
- * Read from an input stream at least <code>necessaryLen</code> and if possible,
- * <code>extraLen</code> also if available. Analogous to
- * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
- * positional read and specifies a number of "extra" bytes that would be
- * desirable but not absolutely necessary to read.
- *
- * @param in the input stream to read from
- * @param position the position within the stream from which to start reading
- * @param buf the buffer to read into
- * @param bufOffset the destination offset in the buffer
- * @param necessaryLen the number of bytes that are absolutely necessary to
- * read
- * @param extraLen the number of extra bytes that would be nice to read
- * @return true if and only if extraLen is > 0 and reading those extra bytes
- * was successful
- * @throws IOException if failed to read the necessary bytes
- */
- @VisibleForTesting
- static boolean positionalReadWithExtra(FSDataInputStream in,
- long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
- throws IOException {
- int bytesRemaining = necessaryLen + extraLen;
- int bytesRead = 0;
- while (bytesRead < necessaryLen) {
- int ret = in.read(position, buf, bufOffset, bytesRemaining);
- if (ret < 0) {
- throw new IOException("Premature EOF from inputStream (positional read "
- + "returned " + ret + ", was trying to read " + necessaryLen
- + " necessary bytes and " + extraLen + " extra bytes, "
- + "successfully read " + bytesRead);
- }
- position += ret;
- bufOffset += ret;
- bytesRemaining -= ret;
- bytesRead += ret;
- }
- return bytesRead != necessaryLen && bytesRemaining <= 0;
- }
-
- /**
* Unified version 2 {@link HFile} block writer. The intended usage pattern
* is as follows:
* <ol>
@@ -988,18 +897,6 @@ public class HFileBlock implements Cacheable {
}
/**
- * Returns the stream for the user to write to. The block writer takes care
- * of handling compression and buffering for caching on write. Can only be
- * called in the "writing" state.
- *
- * @return the data output stream for the user to write to
- */
- DataOutputStream getUserDataStream() {
- expectState(State.WRITING);
- return userDataStream;
- }
-
- /**
* Transitions the block writer from the "writing" state to the "block
* ready" state. Does nothing if a block is already finished.
*/
@@ -1261,11 +1158,9 @@ public class HFileBlock implements Cacheable {
}
/**
- * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is
- * needed for storing packed blocks in the block cache. Expects calling semantics identical to
- * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
- * Does not include checksum data.
- *
+ * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed
+ * for storing packed blocks in the block cache. Returns only the header and data, Does not
+ * include checksum data.
* @return Returns a copy of block bytes for caching on write
*/
private ByteBuffer cloneOnDiskBufferWithHeader() {
@@ -1321,11 +1216,10 @@ public class HFileBlock implements Cacheable {
.withIncludesMvcc(fileContext.isIncludesMvcc())
.withIncludesTags(fileContext.isIncludesTags())
.build();
- return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
+ return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
getUncompressedSizeWithoutHeader(), prevOffset,
- cacheConf.shouldCacheCompressed(blockType.getCategory())?
- cloneOnDiskBufferWithHeader() :
- cloneUncompressedBufferWithHeader(),
+ cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader()
+ : cloneUncompressedBufferWithHeader(),
FILL_HEADER, startOffset, UNSET,
onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext);
}
@@ -1415,8 +1309,8 @@ public class HFileBlock implements Cacheable {
*/
private static class PrefetchedHeader {
long offset = -1;
- byte [] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
- final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
+ byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
+ final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length));
@Override
public String toString() {
@@ -1479,11 +1373,11 @@ public class HFileBlock implements Cacheable {
}
/**
- * A constructor that reads files with the latest minor version.
- * This is used by unit tests only.
+ * A constructor that reads files with the latest minor version. This is used by unit tests
+ * only.
*/
FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
- throws IOException {
+ throws IOException {
this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
}
@@ -1520,60 +1414,49 @@ public class HFileBlock implements Cacheable {
}
/**
- * Does a positional read or a seek and read into the given buffer. Returns
- * the on-disk size of the next block, or -1 if it could not be read/determined; e.g. EOF.
- *
+ * Does a positional read or a seek and read into the given byte buffer. We need take care that
+ * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers,
+ * otherwise the memory leak may happen.
* @param dest destination buffer
- * @param destOffset offset into the destination buffer at where to put the bytes we read
* @param size size of read
* @param peekIntoNextBlock whether to read the next block's on-disk size
* @param fileOffset position in the stream to read at
* @param pread whether we should do a positional read
* @param istream The input source of data
- * @return the on-disk size of the next block with header size included, or
- * -1 if it could not be determined; if not -1, the <code>dest</code> INCLUDES the
- * next header
- * @throws IOException
+ * @return true to indicate the destination buffer include the next block header, otherwise only
+ * include the current block data without the next block header.
+ * @throws IOException if any IO error happen.
*/
- @VisibleForTesting
- protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
- boolean peekIntoNextBlock, long fileOffset, boolean pread)
- throws IOException {
- if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
- // We are asked to read the next block's header as well, but there is
- // not enough room in the array.
- throw new IOException("Attempted to read " + size + " bytes and " + hdrSize +
- " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset);
- }
-
+ protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
+ boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
if (!pread) {
// Seek + read. Better for scanning.
HFileUtil.seekOnMultipleSources(istream, fileOffset);
- // TODO: do we need seek time latencies?
long realOffset = istream.getPos();
if (realOffset != fileOffset) {
- throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size +
- " bytes, but pos=" + realOffset + " after seek");
+ throw new IOException("Tried to seek to " + fileOffset + " to read " + size
+ + " bytes, but pos=" + realOffset + " after seek");
}
-
if (!peekIntoNextBlock) {
- IOUtils.readFully(istream, dest, destOffset, size);
- return -1;
+ BlockIOUtils.readFully(dest, istream, size);
+ return false;
}
- // Try to read the next block header.
- if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
- return -1;
+ // Try to read the next block header
+ if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) {
+ // did not read the next block header.
+ return false;
}
} else {
// Positional read. Better for random reads; or when the streamLock is already locked.
int extraSize = peekIntoNextBlock ? hdrSize : 0;
- if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, size, extraSize)) {
- return -1;
+ if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
+ // did not read the next block header.
+ return false;
}
}
assert peekIntoNextBlock;
- return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
+ return true;
}
/**
@@ -1672,7 +1555,7 @@ public class HFileBlock implements Cacheable {
* is not right.
* @throws IOException
*/
- private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer headerBuf,
+ private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf,
final long offset, boolean verifyChecksum)
throws IOException {
// Assert size provided aligns with what is in the header
@@ -1691,11 +1574,11 @@ public class HFileBlock implements Cacheable {
* we have to backup the stream because we over-read (the next block's header).
* @see PrefetchedHeader
* @return The cached block header or null if not found.
- * @see #cacheNextBlockHeader(long, byte[], int, int)
+ * @see #cacheNextBlockHeader(long, ByteBuff, int, int)
*/
- private ByteBuffer getCachedHeader(final long offset) {
+ private ByteBuff getCachedHeader(final long offset) {
PrefetchedHeader ph = this.prefetchedHeader.get();
- return ph != null && ph.offset == offset? ph.buf: null;
+ return ph != null && ph.offset == offset ? ph.buf : null;
}
/**
@@ -1704,13 +1587,24 @@ public class HFileBlock implements Cacheable {
* @see PrefetchedHeader
*/
private void cacheNextBlockHeader(final long offset,
- final byte [] header, final int headerOffset, final int headerLength) {
+ ByteBuff onDiskBlock, int onDiskSizeWithHeader, int headerLength) {
PrefetchedHeader ph = new PrefetchedHeader();
ph.offset = offset;
- System.arraycopy(header, headerOffset, ph.header, 0, headerLength);
+ onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength);
this.prefetchedHeader.set(ph);
}
+ private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock,
+ int onDiskSizeWithHeader) {
+ int nextBlockOnDiskSize = -1;
+ if (readNextHeader) {
+ nextBlockOnDiskSize =
+ onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH)
+ + hdrSize;
+ }
+ return nextBlockOnDiskSize;
+ }
+
/**
* Reads a version 2 block.
*
@@ -1737,7 +1631,7 @@ public class HFileBlock implements Cacheable {
// Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
// and will save us having to seek the stream backwards to reread the header we
// read the last time through here.
- ByteBuffer headerBuf = getCachedHeader(offset);
+ ByteBuff headerBuf = getCachedHeader(offset);
LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " +
"onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread,
verifyChecksum, headerBuf, onDiskSizeWithHeader);
@@ -1757,9 +1651,9 @@ public class HFileBlock implements Cacheable {
if (LOG.isTraceEnabled()) {
LOG.trace("Extra see to get block size!", new RuntimeException());
}
- headerBuf = ByteBuffer.allocate(hdrSize);
- readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false,
- offset, pread);
+ headerBuf = new SingleByteBuff(ByteBuffer.allocate(hdrSize));
+ readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
+ headerBuf.rewind();
}
onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
}
@@ -1770,46 +1664,55 @@ public class HFileBlock implements Cacheable {
// says where to start reading. If we have the header cached, then we don't need to read
// it again and we can likely read from last place we left off w/o need to backup and reread
// the header we read last time through here.
- // TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap).
- byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
- int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize,
+ ByteBuff onDiskBlock =
+ new SingleByteBuff(ByteBuffer.allocate(onDiskSizeWithHeader + hdrSize));
+ boolean initHFileBlockSuccess = false;
+ try {
+ if (headerBuf != null) {
+ onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
+ }
+ boolean readNextHeader = readAtOffset(is, onDiskBlock,
onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
- if (headerBuf != null) {
- // The header has been read when reading the previous block OR in a distinct header-only
- // read. Copy to this block's header.
- System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
- } else {
- headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
- }
- // Do a few checks before we go instantiate HFileBlock.
- assert onDiskSizeWithHeader > this.hdrSize;
- verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
- ByteBuff onDiskBlockByteBuff =
- new SingleByteBuff(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader));
- // Verify checksum of the data before using it for building HFileBlock.
- if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuff, hdrSize)) {
- return null;
- }
- long duration = System.currentTimeMillis() - startTime;
- if (updateMetrics) {
- HFile.updateReadLatency(duration, pread);
- }
- // The onDiskBlock will become the headerAndDataBuffer for this block.
- // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
- // contains the header of next block, so no need to set next block's header in it.
- HFileBlock hFileBlock = new HFileBlock(onDiskBlockByteBuff, checksumSupport,
- MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext);
- // Run check on uncompressed sizings.
- if (!fileContext.isCompressedOrEncrypted()) {
- hFileBlock.sanityCheckUncompressed();
- }
- LOG.trace("Read {} in {} ns", hFileBlock, duration);
- // Cache next block header if we read it for the next time through here.
- if (nextBlockOnDiskSize != -1) {
- cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(),
- onDiskBlock, onDiskSizeWithHeader, hdrSize);
+ onDiskBlock.rewind(); // in case of moving position when copying a cached header
+ int nextBlockOnDiskSize =
+ getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader);
+ if (headerBuf == null) {
+ headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
+ }
+ // Do a few checks before we go instantiate HFileBlock.
+ assert onDiskSizeWithHeader > this.hdrSize;
+ verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
+ ByteBuff curBlock = onDiskBlock.duplicate().limit(onDiskSizeWithHeader);
+ // Verify checksum of the data before using it for building HFileBlock.
+ if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
+ return null;
+ }
+ long duration = System.currentTimeMillis() - startTime;
+ if (updateMetrics) {
+ HFile.updateReadLatency(duration, pread);
+ }
+ // The onDiskBlock will become the headerAndDataBuffer for this block.
+ // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
+ // contains the header of next block, so no need to set next block's header in it.
+ HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE,
+ offset, nextBlockOnDiskSize, fileContext);
+ // Run check on uncompressed sizings.
+ if (!fileContext.isCompressedOrEncrypted()) {
+ hFileBlock.sanityCheckUncompressed();
+ }
+ LOG.trace("Read {} in {} ns", hFileBlock, duration);
+ // Cache next block header if we read it for the next time through here.
+ if (nextBlockOnDiskSize != -1) {
+ cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
+ onDiskSizeWithHeader, hdrSize);
+ }
+ initHFileBlockSuccess = true;
+ return hFileBlock;
+ } finally {
+ if (!initHFileBlockSuccess) {
+ onDiskBlock.release();
+ }
}
- return hFileBlock;
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockPositionalRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
similarity index 54%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockPositionalRead.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
index a13c868..60180e6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockPositionalRead.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
@@ -17,33 +17,115 @@
*/
package org.apache.hadoop.hbase.io.hfile;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.nio.ByteBuffer;
+
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
-/**
- * Unit test suite covering HFileBlock positional read logic.
- */
-@Category({IOTests.class, SmallTests.class})
-public class TestHFileBlockPositionalRead {
+@Category({ IOTests.class, SmallTests.class })
+public class TestBlockIOUtils {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestHFileBlockPositionalRead.class);
+ HBaseClassTestRule.forClass(TestBlockIOUtils.class);
@Rule
public ExpectedException exception = ExpectedException.none();
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @Test
+ public void testIsByteBufferReadable() throws IOException {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testIsByteBufferReadable");
+ try (FSDataOutputStream out = fs.create(p)) {
+ out.writeInt(23);
+ }
+ try (FSDataInputStream is = fs.open(p)) {
+ assertFalse(BlockIOUtils.isByteBufferReadable(is));
+ }
+ }
+
+ @Test
+ public void testReadFully() throws IOException {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully");
+ String s = "hello world";
+ try (FSDataOutputStream out = fs.create(p)) {
+ out.writeBytes(s);
+ }
+ ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11));
+ try (FSDataInputStream in = fs.open(p)) {
+ BlockIOUtils.readFully(buf, in, 11);
+ }
+ buf.rewind();
+ byte[] heapBuf = new byte[s.length()];
+ buf.get(heapBuf, 0, heapBuf.length);
+ assertArrayEquals(Bytes.toBytes(s), heapBuf);
+ }
+
+ @Test
+ public void testReadWithExtra() throws IOException {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadWithExtra");
+ String s = "hello world";
+ try (FSDataOutputStream out = fs.create(p)) {
+ out.writeBytes(s);
+ }
+ ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8));
+ try (FSDataInputStream in = fs.open(p)) {
+ assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2));
+ }
+ buf.rewind();
+ byte[] heapBuf = new byte[buf.capacity()];
+ buf.get(heapBuf, 0, heapBuf.length);
+ assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf);
+
+ buf = new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4));
+ try (FSDataInputStream in = fs.open(p)) {
+ assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3));
+ }
+ buf.rewind();
+ heapBuf = new byte[11];
+ buf.get(heapBuf, 0, heapBuf.length);
+ assertArrayEquals(Bytes.toBytes("hello world"), heapBuf);
+
+ buf.position(0).limit(12);
+ try (FSDataInputStream in = fs.open(p)) {
+ try {
+ BlockIOUtils.readWithExtra(buf, in, 12, 0);
+ fail("Should only read 11 bytes");
+ } catch (IOException e) {
+
+ }
+ }
+ }
+
@Test
public void testPositionalReadNoExtra() throws IOException {
long position = 0;
@@ -52,10 +134,10 @@ public class TestHFileBlockPositionalRead {
int extraLen = 0;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
+ ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
- boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
- bufOffset, necessaryLen, extraLen);
+ boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verifyNoMoreInteractions(in);
@@ -69,11 +151,11 @@ public class TestHFileBlockPositionalRead {
int extraLen = 0;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
+ ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 5)).thenReturn(5);
- boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
- bufOffset, necessaryLen, extraLen);
+ boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 5);
@@ -88,10 +170,10 @@ public class TestHFileBlockPositionalRead {
int extraLen = 5;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
+ ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
- boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
- bufOffset, necessaryLen, extraLen);
+ boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verifyNoMoreInteractions(in);
@@ -105,10 +187,10 @@ public class TestHFileBlockPositionalRead {
int extraLen = 5;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
+ ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
- boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
- bufOffset, necessaryLen, extraLen);
+ boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertFalse("Expect false return when reading extra bytes fails", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verifyNoMoreInteractions(in);
@@ -123,11 +205,11 @@ public class TestHFileBlockPositionalRead {
int extraLen = 5;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
+ ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 10)).thenReturn(10);
- boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf,
- bufOffset, necessaryLen, extraLen);
+ boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 10);
@@ -142,12 +224,12 @@ public class TestHFileBlockPositionalRead {
int extraLen = 0;
int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen];
+ ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1);
exception.expect(IOException.class);
exception.expectMessage("EOF");
- HFileBlock.positionalReadWithExtra(in, position, buf, bufOffset,
- necessaryLen, extraLen);
+ BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index e93b61e..a4135d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -398,23 +398,25 @@ public class TestChecksum {
return b;
}
+
@Override
- protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
+ protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
- int returnValue = super.readAtOffset(istream, dest, destOffset, size, peekIntoNextBlock,
- fileOffset, pread);
+ int destOffset = dest.position();
+ boolean returnValue =
+ super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread);
if (!corruptDataStream) {
return returnValue;
}
// Corrupt 3rd character of block magic of next block's header.
if (peekIntoNextBlock) {
- dest[destOffset + size + 3] = 0b00000000;
+ dest.put(destOffset + size + 3, (byte) 0b00000000);
}
// We might be reading this block's header too, corrupt it.
- dest[destOffset + 1] = 0b00000000;
+ dest.put(destOffset + 1, (byte) 0b00000000);
// Corrupt non header data
if (size > hdrSize) {
- dest[destOffset + hdrSize + 1] = 0b00000000;
+ dest.put(destOffset + hdrSize + 1, (byte) 0b00000000);
}
return returnValue;
}