You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/06/18 12:32:12 UTC
[hbase] 10/22: HBASE-21937 Make the Compression#decompress can
accept ByteBuff as input
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 6b584dea512cfaa6d1f56da581fc0823d7e55adb
Author: huzheng <op...@gmail.com>
AuthorDate: Tue Apr 2 20:44:08 2019 +0800
HBASE-21937 Make the Compression#decompress can accept ByteBuff as input
---
.../hadoop/hbase/io/compress/Compression.java | 51 ++++++++--------------
.../encoding/HFileBlockDefaultDecodingContext.java | 11 +++--
.../apache/hadoop/hbase/io/util}/BlockIOUtils.java | 45 ++++++++++++++++---
.../apache/hadoop/hbase/io/hfile/HFileBlock.java | 29 +++++++-----
.../io/encoding/TestLoadAndSwitchEncodeOnDisk.java | 2 -
.../hadoop/hbase/io/hfile/TestBlockIOUtils.java | 1 +
.../hadoop/hbase/io/hfile/TestHFileBlock.java | 18 ++++----
7 files changed, 90 insertions(+), 67 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
index d258ba2..3004973 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
@@ -25,7 +25,8 @@ import java.io.OutputStream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hbase.io.util.BlockIOUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
@@ -438,45 +439,29 @@ public final class Compression {
}
/**
- * Decompresses data from the given stream using the configured compression
- * algorithm. It will throw an exception if the dest buffer does not have
- * enough space to hold the decompressed data.
- *
- * @param dest
- * the output bytes buffer
- * @param destOffset
- * start writing position of the output buffer
- * @param bufferedBoundedStream
- * a stream to read compressed data from, bounded to the exact amount
+ * Decompresses data from the given stream using the configured compression algorithm. It will
+ * throw an exception if the dest buffer does not have enough space to hold the decompressed data.
+ * @param dest the output buffer
+ * @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact amount
* of compressed data
- * @param compressedSize
- * compressed data size, header not included
- * @param uncompressedSize
- * uncompressed data size, header not included
- * @param compressAlgo
- * compression algorithm used
- * @throws IOException
+ * @param uncompressedSize uncompressed data size, header not included
+ * @param compressAlgo compression algorithm used
+ * @throws IOException if any IO error happen
*/
- public static void decompress(byte[] dest, int destOffset,
- InputStream bufferedBoundedStream, int compressedSize,
- int uncompressedSize, Compression.Algorithm compressAlgo)
- throws IOException {
-
- if (dest.length - destOffset < uncompressedSize) {
- throw new IllegalArgumentException(
- "Output buffer does not have enough space to hold "
- + uncompressedSize + " decompressed bytes, available: "
- + (dest.length - destOffset));
+ public static void decompress(ByteBuff dest, InputStream bufferedBoundedStream,
+ int uncompressedSize, Compression.Algorithm compressAlgo) throws IOException {
+ if (dest.remaining() < uncompressedSize) {
+ throw new IllegalArgumentException("Output buffer does not have enough space to hold "
+ + uncompressedSize + " decompressed bytes, available: " + dest.remaining());
}
Decompressor decompressor = null;
try {
decompressor = compressAlgo.getDecompressor();
- InputStream is = compressAlgo.createDecompressionStream(
- bufferedBoundedStream, decompressor, 0);
-
- IOUtils.readFully(is, dest, destOffset, uncompressedSize);
- is.close();
+ try (InputStream is =
+ compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0)) {
+ BlockIOUtils.readFullyWithHeapBuffer(is, dest, uncompressedSize);
+ }
} finally {
if (decompressor != null) {
compressAlgo.returnDecompressor(decompressor);
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
index d5bf58c..97d0e6b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Decryptor;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@@ -87,14 +88,12 @@ public class HFileBlockDefaultDecodingContext implements
}
Compression.Algorithm compression = fileContext.getCompression();
- assert blockBufferWithoutHeader.hasArray();
if (compression != Compression.Algorithm.NONE) {
- Compression.decompress(blockBufferWithoutHeader.array(),
- blockBufferWithoutHeader.arrayOffset(), dataInputStream, onDiskSizeWithoutHeader,
- uncompressedSizeWithoutHeader, compression);
+ Compression.decompress(blockBufferWithoutHeader, dataInputStream,
+ uncompressedSizeWithoutHeader, compression);
} else {
- IOUtils.readFully(dataInputStream, blockBufferWithoutHeader.array(),
- blockBufferWithoutHeader.arrayOffset(), onDiskSizeWithoutHeader);
+ BlockIOUtils.readFullyWithHeapBuffer(dataInputStream, blockBufferWithoutHeader,
+ onDiskSizeWithoutHeader);
}
} finally {
byteBuffInputStream.close();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
similarity index 86%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java
rename to hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
index dbd5b2e..a98a478 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.io.hfile;
+package org.apache.hadoop.hbase.io.util;
import java.io.IOException;
import java.io.InputStream;
@@ -29,9 +29,14 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-class BlockIOUtils {
+public final class BlockIOUtils {
- static boolean isByteBufferReadable(FSDataInputStream is) {
+ // Disallow instantiation
+ private BlockIOUtils() {
+
+ }
+
+ public static boolean isByteBufferReadable(FSDataInputStream is) {
InputStream cur = is.getWrappedStream();
for (;;) {
if ((cur instanceof FSDataInputStream)) {
@@ -50,7 +55,7 @@ class BlockIOUtils {
* @param length bytes to read.
* @throws IOException exception to throw if any error happen
*/
- static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
+ public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
if (!isByteBufferReadable(dis)) {
// If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
// the destination ByteBuff.
@@ -82,6 +87,32 @@ class BlockIOUtils {
}
/**
+ * Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default
+ * size is 1024 now).
+ * @param in the InputStream to read
+ * @param out the destination {@link ByteBuff}
+ * @param length to read
+ * @throws IOException if any io error encountered.
+ */
+ public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length)
+ throws IOException {
+ byte[] buffer = new byte[1024];
+ if (length < 0) {
+ throw new IllegalArgumentException("Length must not be negative: " + length);
+ }
+ int remain = length, count;
+ while (remain > 0) {
+ count = in.read(buffer, 0, Math.min(remain, buffer.length));
+ if (count < 0) {
+ throw new IOException(
+ "Premature EOF from inputStream, but still need " + remain + " bytes");
+ }
+ out.put(buffer, 0, count);
+ remain -= count;
+ }
+ }
+
+ /**
* Read from an input stream at least <code>necessaryLen</code> and if possible,
* <code>extraLen</code> also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra"
@@ -125,8 +156,8 @@ class BlockIOUtils {
* ByteBuffers, otherwise we've not read the extraLen bytes yet.
* @throws IOException if failed to read the necessary bytes.
*/
- static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, int extraLen)
- throws IOException {
+ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen,
+ int extraLen) throws IOException {
if (!isByteBufferReadable(dis)) {
// If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
// the destination ByteBuff.
@@ -174,7 +205,7 @@ class BlockIOUtils {
* @return true if and only if extraLen is > 0 and reading those extra bytes was successful
* @throws IOException if failed to read the necessary bytes
*/
- static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
+ public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
int remain = necessaryLen + extraLen;
byte[] buf = new byte[remain];
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 2c8fa4d..a3738d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -527,14 +528,22 @@ public class HFileBlock implements Cacheable {
}
/**
- * Returns a buffer that does not include the header or checksum.
- *
+ * Returns a buffer that does not include the header and checksum.
* @return the buffer with header skipped and checksum omitted.
*/
public ByteBuff getBufferWithoutHeader() {
+ return this.getBufferWithoutHeader(false);
+ }
+
+ /**
+ * Returns a buffer that does not include the header or checksum.
+ * @param withChecksum to indicate whether include the checksum or not.
+ * @return the buffer with header skipped and checksum omitted.
+ */
+ public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
ByteBuff dup = getBufferReadOnly();
- // Now set it up so Buffer spans content only -- no header or no checksums.
- return dup.position(headerSize()).limit(buf.limit() - totalChecksumBytes()).slice();
+ int delta = withChecksum ? 0 : totalChecksumBytes();
+ return dup.position(headerSize()).limit(buf.limit() - delta).slice();
}
/**
@@ -608,8 +617,9 @@ public class HFileBlock implements Cacheable {
// We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
// block's header, so there are two sensible values for buffer capacity.
int hdrSize = headerSize();
- if (dup.capacity() != expectedBufLimit && dup.capacity() != expectedBufLimit + hdrSize) {
- throw new AssertionError("Invalid buffer capacity: " + dup.capacity() +
+ dup.rewind();
+ if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) {
+ throw new AssertionError("Invalid buffer capacity: " + dup.remaining() +
", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
}
}
@@ -671,15 +681,15 @@ public class HFileBlock implements Cacheable {
HFileBlock unpacked = new HFileBlock(this);
unpacked.allocateBuffer(); // allocates space for the decompressed block
- HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
- reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
+ HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
+ ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
ByteBuff dup = this.buf.duplicate();
dup.position(this.headerSize());
dup = dup.slice();
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
- unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
+ unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
return unpacked;
}
@@ -697,7 +707,6 @@ public class HFileBlock implements Cacheable {
ByteBuff newBuf = allocator.allocate(capacityNeeded);
// Copy header bytes into newBuf.
- // newBuf is HBB so no issue in calling array()
buf.position(0);
newBuf.put(0, buf, 0, headerSize);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
index fb9e44f..d53d24e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.TestMiniClusterLoadSequential;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runners.Parameterized.Parameters;
@@ -74,7 +73,6 @@ public class TestLoadAndSwitchEncodeOnDisk extends TestMiniClusterLoadSequential
@Override
@Test
- @Ignore("TODO Ignore this UT temporarily, will fix this in the critical HBASE-21937.")
public void loadTest() throws Exception {
Admin admin = TEST_UTIL.getAdmin();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
index 60180e6..a386f49 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 2733ca2..af42a24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -97,8 +97,7 @@ public class TestHFileBlock {
private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
- // TODO let uncomment the GZ algorithm in HBASE-21937, because no support BB unpack yet.
- static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, /* GZ */ };
+ static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
private static final int NUM_TEST_BLOCKS = 1000;
private static final int NUM_READER_THREADS = 26;
@@ -623,7 +622,7 @@ public class TestHFileBlock {
if (detailedLogging) {
LOG.info("Reading block #" + i + " at offset " + curOffset);
}
- HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, true);
+ HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, false);
if (detailedLogging) {
LOG.info("Block #" + i + ": " + b);
}
@@ -638,7 +637,7 @@ public class TestHFileBlock {
// Now re-load this block knowing the on-disk size. This tests a
// different branch in the loader.
HFileBlock b2 =
- hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, true);
+ hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, false);
b2.sanityCheck();
assertEquals(b.getBlockType(), b2.getBlockType());
@@ -667,11 +666,10 @@ public class TestHFileBlock {
// expectedContents have header + data only
ByteBuff bufRead = newBlock.getBufferReadOnly();
ByteBuffer bufExpected = expectedContents.get(i);
- boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
- bufRead.arrayOffset(),
- bufRead.limit() - newBlock.totalChecksumBytes(),
- bufExpected.array(), bufExpected.arrayOffset(),
- bufExpected.limit()) == 0;
+ byte[] tmp = new byte[bufRead.limit() - newBlock.totalChecksumBytes()];
+ bufRead.get(tmp, 0, tmp.length);
+ boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, bufExpected.array(),
+ bufExpected.arrayOffset(), bufExpected.limit()) == 0;
String wrongBytesMsg = "";
if (!bytesAreCorrect) {
@@ -702,6 +700,8 @@ public class TestHFileBlock {
if (newBlock != b) {
assertTrue(b.release());
}
+ } else {
+ assertTrue(b.release());
}
}
assertEquals(curOffset, fs.getFileStatus(path).getLen());