You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/07/04 20:18:52 UTC
[hbase] branch master updated: HBASE-27170 ByteBuffAllocator leak when decompressing blocks near minSizeForReservoirUse (#4592)
This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new f3f292fad41 HBASE-27170 ByteBuffAllocator leak when decompressing blocks near minSizeForReservoirUse (#4592)
f3f292fad41 is described below
commit f3f292fad414526e981d299c66ba057765543179
Author: Bryan Beaudreault <bb...@hubspot.com>
AuthorDate: Mon Jul 4 16:18:45 2022 -0400
HBASE-27170 ByteBuffAllocator leak when decompressing blocks near minSizeForReservoirUse (#4592)
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../apache/hadoop/hbase/io/ByteBuffAllocator.java | 19 +-
.../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 23 ++
.../java/org/apache/hadoop/hbase/nio/RefCnt.java | 62 +++-
.../hadoop/hbase/io/TestByteBuffAllocator.java | 18 +-
.../io/TestByteBuffAllocatorLeakDetection.java | 341 +++++++++++++++++++++
.../apache/hadoop/hbase/io/hfile/HFileBlock.java | 61 ++--
.../hbase/io/hfile/TestHFileBlockUnpack.java | 169 ++++++++++
7 files changed, 664 insertions(+), 29 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
index ecef63662f6..60b89223c16 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -319,11 +319,20 @@ public class ByteBuffAllocator {
// just allocate the ByteBuffer from on-heap.
bbs.add(allocateOnHeap(remain));
}
- ByteBuff bb = ByteBuff.wrap(bbs, () -> {
- for (int i = 0; i < lenFromReservoir; i++) {
- this.putbackBuffer(bbs.get(i));
- }
- });
+
+ ByteBuff bb;
+ // we only need a recycler if we successfully pulled from the pool
+ // this matters for determining whether to add leak detection in RefCnt
+ if (lenFromReservoir == 0) {
+ bb = ByteBuff.wrap(bbs);
+ } else {
+ bb = ByteBuff.wrap(bbs, () -> {
+ for (int i = 0; i < lenFromReservoir; i++) {
+ this.putbackBuffer(bbs.get(i));
+ }
+ });
+ }
+
bb.limit(size);
return bb;
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 9e2ccc33131..2925fab161b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.nio;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -547,6 +548,28 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
return wrap(buffer, RefCnt.create());
}
+ /**
+ * Calling this method in strategic locations where ByteBuffs are referenced may help diagnose
+ * potential buffer leaks. We pass the buffer itself as a default hint, but one can use
+ * {@link #touch(Object)} to pass their own hint as well.
+ */
+ @Override
+ public ByteBuff touch() {
+ return touch(this);
+ }
+
+ @Override
+ public ByteBuff touch(Object hint) {
+ refCnt.touch(hint);
+ return this;
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ public RefCnt getRefCnt() {
+ return refCnt;
+ }
+
/**
* Make this private because we don't want to expose the refCnt related wrap method to upstream.
*/
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
index c7b6dbf7086..7c1f23383d3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.nio;
+import com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetectorFactory;
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakTracker;
/**
* Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
@@ -31,7 +35,10 @@ import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
@InterfaceAudience.Private
public class RefCnt extends AbstractReferenceCounted {
- private Recycler recycler = ByteBuffAllocator.NONE;
+ private static final ResourceLeakDetector<RefCnt> detector =
+ ResourceLeakDetectorFactory.instance().newResourceLeakDetector(RefCnt.class);
+ private final Recycler recycler;
+ private final ResourceLeakTracker<RefCnt> leak;
/**
* Create an {@link RefCnt} with an initial reference count = 1. If the reference count become
@@ -49,15 +56,66 @@ public class RefCnt extends AbstractReferenceCounted {
public RefCnt(Recycler recycler) {
this.recycler = recycler;
+ this.leak = recycler == ByteBuffAllocator.NONE ? null : detector.track(this);
+ }
+
+ @Override
+ public ReferenceCounted retain() {
+ maybeRecord();
+ return super.retain();
+ }
+
+ @Override
+ public ReferenceCounted retain(int increment) {
+ maybeRecord();
+ return super.retain(increment);
+ }
+
+ @Override
+ public boolean release() {
+ maybeRecord();
+ return super.release();
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ maybeRecord();
+ return super.release(decrement);
}
@Override
protected final void deallocate() {
this.recycler.free();
+ if (leak != null) {
+ this.leak.close(this);
+ }
+ }
+
+ @Override
+ public RefCnt touch() {
+ maybeRecord();
+ return this;
}
@Override
public final ReferenceCounted touch(Object hint) {
- throw new UnsupportedOperationException();
+ maybeRecord(hint);
+ return this;
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ public Recycler getRecycler() {
+ return recycler;
+ }
+
+ private void maybeRecord() {
+ maybeRecord(null);
+ }
+
+ private void maybeRecord(Object hint) {
+ if (leak != null) {
+ leak.record(hint);
+ }
}
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
index 7cfdcd659d1..d77dc6604be 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.getHeapAllocationRatio;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -45,6 +46,21 @@ public class TestByteBuffAllocator {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBuffAllocator.class);
+ @Test
+ public void testRecycleOnlyPooledBuffers() {
+ int maxBuffersInPool = 10;
+ int bufSize = 1024;
+ int minSize = bufSize / 8;
+ ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
+
+ ByteBuff buff = alloc.allocate(minSize - 1);
+ assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
+
+ alloc = new ByteBuffAllocator(true, 0, bufSize, minSize);
+ buff = alloc.allocate(minSize * 2);
+ assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
+ }
+
@Test
public void testAllocateByteBuffToReadInto() {
int maxBuffersInPool = 10;
@@ -329,8 +345,6 @@ public class TestByteBuffAllocator {
ByteBuff buf = alloc.allocate(bufSize);
assertException(() -> buf.retain(2));
assertException(() -> buf.release(2));
- assertException(() -> buf.touch());
- assertException(() -> buf.touch(new Object()));
}
private void assertException(Runnable r) {
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocatorLeakDetection.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocatorLeakDetection.java
new file mode 100644
index 00000000000..ffc0292902e
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocatorLeakDetection.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
+import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogLevel;
+import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogger;
+import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLoggerFactory;
+import org.apache.hbase.thirdparty.io.netty.util.internal.logging.Slf4JLoggerFactory;
+
+@Category({ RPCTests.class, SmallTests.class })
+public class TestByteBuffAllocatorLeakDetection {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestByteBuffAllocatorLeakDetection.class);
+
+ @SuppressWarnings("unused")
+ @Test
+ public void testLeakDetection() throws InterruptedException {
+ InternalLoggerFactory original = InternalLoggerFactory.getDefaultFactory();
+ AtomicInteger leaksDetected = new AtomicInteger();
+ InternalLoggerFactory.setDefaultFactory(new MockedLoggerFactory(leaksDetected));
+
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+ assertTrue(ResourceLeakDetector.isEnabled());
+
+ try {
+ int maxBuffersInPool = 10;
+ int bufSize = 1024;
+ int minSize = bufSize / 8;
+ ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
+
+ // tracking leaks happens on creation of a RefCnt, through a call to detector.track().
+ // If a leak occurs, but detector.track() is never called again, the leak will not be
+ // realized. Further, causing a leak requires a GC event. So below we do some allocations,
+ // cause some GC's, do more allocations, and then expect a leak to show up.
+
+ // first allocate on-heap. we expect to not see a leak from this, because we
+ // dont count on-heap references
+ alloc.allocate(minSize - 1);
+ System.gc();
+ Thread.sleep(1000);
+
+ // cause an allocation to trigger a leak detection, if there were one.
+ // keep a reference so we don't trigger a leak right away from this.
+ ByteBuff reference = alloc.allocate(minSize * 2);
+ assertEquals(0, leaksDetected.get());
+
+ // allocate, but don't keep a reference. this should cause a leak
+ alloc.allocate(minSize * 2);
+ System.gc();
+ Thread.sleep(1000);
+
+ // allocate again, this should cause the above leak to be detected
+ alloc.allocate(minSize * 2);
+ assertEquals(1, leaksDetected.get());
+ } finally {
+ InternalLoggerFactory.setDefaultFactory(original);
+ }
+ }
+
+ private static class MockedLoggerFactory extends Slf4JLoggerFactory {
+
+ private AtomicInteger leaksDetected;
+
+ public MockedLoggerFactory(AtomicInteger leaksDetected) {
+ this.leaksDetected = leaksDetected;
+ }
+
+ @Override
+ public InternalLogger newInstance(String name) {
+ InternalLogger delegate = super.newInstance(name);
+ return new MockedLogger(leaksDetected, delegate);
+ }
+ }
+
+ private static class MockedLogger implements InternalLogger {
+
+ private AtomicInteger leaksDetected;
+ private InternalLogger delegate;
+
+ public MockedLogger(AtomicInteger leaksDetected, InternalLogger delegate) {
+ this.leaksDetected = leaksDetected;
+ this.delegate = delegate;
+ }
+
+ private void maybeCountLeak(String msgOrFormat) {
+ if (msgOrFormat.startsWith("LEAK")) {
+ leaksDetected.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void error(String msg) {
+ maybeCountLeak(msg);
+ delegate.error(msg);
+ }
+
+ @Override
+ public void error(String format, Object arg) {
+ maybeCountLeak(format);
+ delegate.error(format, arg);
+ }
+
+ @Override
+ public void error(String format, Object argA, Object argB) {
+ maybeCountLeak(format);
+ delegate.error(format, argA, argB);
+ }
+
+ @Override
+ public void error(String format, Object... arguments) {
+ maybeCountLeak(format);
+ delegate.error(format, arguments);
+ }
+
+ @Override
+ public void error(String msg, Throwable t) {
+ maybeCountLeak(msg);
+ delegate.error(msg, t);
+ }
+
+ @Override
+ public void error(Throwable t) {
+ delegate.error(t);
+ }
+
+ @Override
+ public String name() {
+ return delegate.name();
+ }
+
+ @Override
+ public boolean isTraceEnabled() {
+ return delegate.isTraceEnabled();
+ }
+
+ @Override
+ public void trace(String msg) {
+ delegate.trace(msg);
+ }
+
+ @Override
+ public void trace(String format, Object arg) {
+ delegate.trace(format, arg);
+ }
+
+ @Override
+ public void trace(String format, Object argA, Object argB) {
+ delegate.trace(format, argA, argB);
+ }
+
+ @Override
+ public void trace(String format, Object... arguments) {
+ delegate.trace(format, arguments);
+ }
+
+ @Override
+ public void trace(String msg, Throwable t) {
+ delegate.trace(msg, t);
+ }
+
+ @Override
+ public void trace(Throwable t) {
+ delegate.trace(t);
+ }
+
+ @Override
+ public boolean isDebugEnabled() {
+ return delegate.isDebugEnabled();
+ }
+
+ @Override
+ public void debug(String msg) {
+ delegate.debug(msg);
+ }
+
+ @Override
+ public void debug(String format, Object arg) {
+ delegate.debug(format, arg);
+ }
+
+ @Override
+ public void debug(String format, Object argA, Object argB) {
+ delegate.debug(format, argA, argB);
+ }
+
+ @Override
+ public void debug(String format, Object... arguments) {
+ delegate.debug(format, arguments);
+ }
+
+ @Override
+ public void debug(String msg, Throwable t) {
+ delegate.debug(msg, t);
+ }
+
+ @Override
+ public void debug(Throwable t) {
+ delegate.debug(t);
+ }
+
+ @Override
+ public boolean isInfoEnabled() {
+ return delegate.isInfoEnabled();
+ }
+
+ @Override
+ public void info(String msg) {
+ delegate.info(msg);
+ }
+
+ @Override
+ public void info(String format, Object arg) {
+ delegate.info(format, arg);
+ }
+
+ @Override
+ public void info(String format, Object argA, Object argB) {
+ delegate.info(format, argA, argB);
+ }
+
+ @Override
+ public void info(String format, Object... arguments) {
+ delegate.info(format, arguments);
+ }
+
+ @Override
+ public void info(String msg, Throwable t) {
+ delegate.info(msg, t);
+ }
+
+ @Override
+ public void info(Throwable t) {
+ delegate.info(t);
+ }
+
+ @Override
+ public boolean isWarnEnabled() {
+ return delegate.isWarnEnabled();
+ }
+
+ @Override
+ public void warn(String msg) {
+ delegate.warn(msg);
+ }
+
+ @Override
+ public void warn(String format, Object arg) {
+ delegate.warn(format, arg);
+ }
+
+ @Override
+ public void warn(String format, Object... arguments) {
+ delegate.warn(format, arguments);
+ }
+
+ @Override
+ public void warn(String format, Object argA, Object argB) {
+ delegate.warn(format, argA, argB);
+ }
+
+ @Override
+ public void warn(String msg, Throwable t) {
+ delegate.warn(msg, t);
+ }
+
+ @Override
+ public void warn(Throwable t) {
+ delegate.warn(t);
+ }
+
+ @Override
+ public boolean isErrorEnabled() {
+ return delegate.isErrorEnabled();
+ }
+
+ @Override
+ public boolean isEnabled(InternalLogLevel level) {
+ return delegate.isEnabled(level);
+ }
+
+ @Override
+ public void log(InternalLogLevel level, String msg) {
+ delegate.log(level, msg);
+ }
+
+ @Override
+ public void log(InternalLogLevel level, String format, Object arg) {
+ delegate.log(level, format, arg);
+ }
+
+ @Override
+ public void log(InternalLogLevel level, String format, Object argA, Object argB) {
+ delegate.log(level, format, argA, argB);
+ }
+
+ @Override
+ public void log(InternalLogLevel level, String format, Object... arguments) {
+ delegate.log(level, format, arguments);
+ }
+
+ @Override
+ public void log(InternalLogLevel level, String msg, Throwable t) {
+ delegate.log(level, msg, t);
+ }
+
+ @Override
+ public void log(InternalLogLevel level, Throwable t) {
+ delegate.log(level, t);
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index d054e83ac19..0da331897dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -418,6 +418,22 @@ public class HFileBlock implements Cacheable {
return buf.release();
}
+ /**
+ * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose
+ * potential buffer leaks. We pass the block itself as a default hint, but one can use
+ * {@link #touch(Object)} to pass their own hint as well.
+ */
+ @Override
+ public HFileBlock touch() {
+ return touch(this);
+ }
+
+ @Override
+ public HFileBlock touch(Object hint) {
+ buf.touch(hint);
+ return this;
+ }
+
/** @return get data block encoding id that was used to encode this block */
short getDataBlockEncodingId() {
if (blockType != BlockType.ENCODED_DATA) {
@@ -616,8 +632,9 @@ public class HFileBlock implements Cacheable {
return this;
}
- HFileBlock unpacked = shallowClone(this);
- unpacked.allocateBuffer(); // allocates space for the decompressed block
+ ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block
+ HFileBlock unpacked = shallowClone(this, newBuf);
+
boolean succ = false;
try {
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
@@ -643,20 +660,21 @@ public class HFileBlock implements Cacheable {
* Always allocates a new buffer of the correct size. Copies header bytes from the existing
* buffer. Does not change header fields. Reserve room to keep checksum bytes too.
*/
- private void allocateBuffer() {
+ private ByteBuff allocateBufferForUnpacking() {
int cksumBytes = totalChecksumBytes();
int headerSize = headerSize();
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
+ ByteBuff source = buf.duplicate();
ByteBuff newBuf = allocator.allocate(capacityNeeded);
// Copy header bytes into newBuf.
- buf.position(0);
- newBuf.put(0, buf, 0, headerSize);
+ source.position(0);
+ newBuf.put(0, source, 0, headerSize);
- buf = newBuf;
// set limit to exclude next block's header
- buf.limit(capacityNeeded);
+ newBuf.limit(capacityNeeded);
+ return newBuf;
}
/**
@@ -708,11 +726,6 @@ public class HFileBlock implements Cacheable {
* by default.
*/
public boolean isSharedMem() {
- if (this instanceof SharedMemHFileBlock) {
- return true;
- } else if (this instanceof ExclusiveMemHFileBlock) {
- return false;
- }
return true;
}
@@ -1997,23 +2010,31 @@ public class HFileBlock implements Cacheable {
+ onDiskDataSizeWithHeader;
}
- private static HFileBlockBuilder createBuilder(HFileBlock blk) {
+ /**
+ * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be
+ * loaded with all of the original fields from blk, except now using the newBuff and setting
+ * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been
+ * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a
+ * {@link SharedMemHFileBlock}. Or vice versa.
+ * @param blk the block to clone from
+ * @param newBuff the new buffer to use
+ */
+ private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) {
return new HFileBlockBuilder().withBlockType(blk.blockType)
.withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
.withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
- .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(blk.buf.duplicate()) // Duplicate the
- // buffer.
- .withOffset(blk.offset).withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
+ .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset)
+ .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
.withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext)
- .withByteBuffAllocator(blk.allocator).withShared(blk.isSharedMem());
+ .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray());
}
- static HFileBlock shallowClone(HFileBlock blk) {
- return createBuilder(blk).build();
+ private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
+ return createBuilder(blk, newBuf).build();
}
static HFileBlock deepCloneOnHeap(HFileBlock blk) {
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
- return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
+ return createBuilder(blk, deepCloned).build();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java
new file mode 100644
index 00000000000..a8399fa6b5a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ IOTests.class, MediumTests.class })
+public class TestHFileBlockUnpack {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestHFileBlockUnpack.class);
+
+ private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+ // repetition gives us some chance to get a good compression ratio
+ private static float CHANCE_TO_REPEAT = 0.6f;
+
+ private static final int MIN_ALLOCATION_SIZE = 10 * 1024;
+
+ ByteBuffAllocator allocator;
+
+ @Rule
+ public TestName name = new TestName();
+ private FileSystem fs;
+
+ @Before
+ public void setUp() throws Exception {
+ fs = HFileSystem.get(TEST_UTIL.getConfiguration());
+ Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE);
+ allocator = ByteBuffAllocator.create(conf, true);
+
+ }
+
+ /**
+ * If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that
+ * block will be allocated to heap regardless of desire for off-heap. After de-compressing the
+ * block, the new size may now exceed the min allocation size. This test ensures that those
+ * de-compressed blocks, which will be allocated off-heap, are properly marked as
+ * {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170
+ */
+ @Test
+ public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ HFileContext meta =
+ new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false)
+ .withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
+
+ Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
+ int totalSize;
+ try (FSDataOutputStream os = fs.create(path)) {
+ HFileBlock.Writer hbw = new HFileBlock.Writer(conf, NoOpDataBlockEncoder.INSTANCE, meta);
+ hbw.startWriting(BlockType.DATA);
+ writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1);
+ hbw.writeHeaderAndData(os);
+ totalSize = hbw.getOnDiskSizeWithHeader();
+ assertTrue(
+ "expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE,
+ totalSize < MIN_ALLOCATION_SIZE);
+ }
+
+ try (FSDataInputStream is = fs.open(path)) {
+ meta =
+ new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ)
+ .withIncludesMvcc(false).withIncludesTags(false).build();
+ ReaderContext context =
+ new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
+ .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
+ HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, allocator, conf);
+ hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, conf);
+ hbr.setIncludesMemStoreTS(false);
+ HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false);
+ blockFromHFile.sanityCheck();
+ assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.isUnpacked());
+ assertFalse("expected hfile block to NOT use shared memory", blockFromHFile.isSharedMem());
+
+ assertTrue(
+ "expected generated block size " + blockFromHFile.getOnDiskSizeWithHeader()
+ + " to be less than " + MIN_ALLOCATION_SIZE,
+ blockFromHFile.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE);
+ assertTrue(
+ "expected generated block uncompressed size "
+ + blockFromHFile.getUncompressedSizeWithoutHeader() + " to be more than "
+ + MIN_ALLOCATION_SIZE,
+ blockFromHFile.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE);
+
+ HFileBlock blockUnpacked = blockFromHFile.unpack(meta, hbr);
+ assertTrue("expected unpacked block to be unpacked", blockUnpacked.isUnpacked());
+ assertTrue("expected unpacked block to use shared memory", blockUnpacked.isSharedMem());
+ }
+ }
+
+ static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException {
+ Random random = new Random(42);
+
+ byte[] family = new byte[] { 1 };
+ int rowKey = 0;
+ int qualifier = 0;
+ int value = 0;
+ long timestamp = 0;
+
+ int totalSize = 0;
+
+ // go until just up to the limit. compression should bring the total on-disk size under
+ while (totalSize < desiredSize) {
+ rowKey = maybeIncrement(random, rowKey);
+ qualifier = maybeIncrement(random, qualifier);
+ value = maybeIncrement(random, value);
+ timestamp = maybeIncrement(random, (int) timestamp);
+
+ KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier),
+ timestamp, Bytes.toBytes(value));
+ hbw.write(keyValue);
+ totalSize += keyValue.getLength();
+ }
+
+ return totalSize;
+ }
+
+ private static int maybeIncrement(Random random, int value) {
+ if (random.nextFloat() < CHANCE_TO_REPEAT) {
+ return value;
+ }
+ return value + 1;
+ }
+
+}