You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/07/04 20:18:52 UTC

[hbase] branch master updated: HBASE-27170 ByteBuffAllocator leak when decompressing blocks near minSizeForReservoirUse (#4592)

This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new f3f292fad41 HBASE-27170 ByteBuffAllocator leak when decompressing blocks near minSizeForReservoirUse (#4592)
f3f292fad41 is described below

commit f3f292fad414526e981d299c66ba057765543179
Author: Bryan Beaudreault <bb...@hubspot.com>
AuthorDate: Mon Jul 4 16:18:45 2022 -0400

    HBASE-27170 ByteBuffAllocator leak when decompressing blocks near minSizeForReservoirUse (#4592)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../apache/hadoop/hbase/io/ByteBuffAllocator.java  |  19 +-
 .../java/org/apache/hadoop/hbase/nio/ByteBuff.java |  23 ++
 .../java/org/apache/hadoop/hbase/nio/RefCnt.java   |  62 +++-
 .../hadoop/hbase/io/TestByteBuffAllocator.java     |  18 +-
 .../io/TestByteBuffAllocatorLeakDetection.java     | 341 +++++++++++++++++++++
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   |  61 ++--
 .../hbase/io/hfile/TestHFileBlockUnpack.java       | 169 ++++++++++
 7 files changed, 664 insertions(+), 29 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
index ecef63662f6..60b89223c16 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -319,11 +319,20 @@ public class ByteBuffAllocator {
       // just allocate the ByteBuffer from on-heap.
       bbs.add(allocateOnHeap(remain));
     }
-    ByteBuff bb = ByteBuff.wrap(bbs, () -> {
-      for (int i = 0; i < lenFromReservoir; i++) {
-        this.putbackBuffer(bbs.get(i));
-      }
-    });
+
+    ByteBuff bb;
+    // we only need a recycler if we successfully pulled from the pool
+    // this matters for determining whether to add leak detection in RefCnt
+    if (lenFromReservoir == 0) {
+      bb = ByteBuff.wrap(bbs);
+    } else {
+      bb = ByteBuff.wrap(bbs, () -> {
+        for (int i = 0; i < lenFromReservoir; i++) {
+          this.putbackBuffer(bbs.get(i));
+        }
+      });
+    }
+
     bb.limit(size);
     return bb;
   }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 9e2ccc33131..2925fab161b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.nio;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -547,6 +548,28 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
     return wrap(buffer, RefCnt.create());
   }
 
+  /**
+   * Calling this method in strategic locations where ByteBuffs are referenced may help diagnose
+   * potential buffer leaks. We pass the buffer itself as a default hint, but one can use
+   * {@link #touch(Object)} to pass their own hint as well.
+   */
+  @Override
+  public ByteBuff touch() {
+    return touch(this);
+  }
+
+  @Override
+  public ByteBuff touch(Object hint) {
+    refCnt.touch(hint);
+    return this;
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  public RefCnt getRefCnt() {
+    return refCnt;
+  }
+
   /**
    * Make this private because we don't want to expose the refCnt related wrap method to upstream.
    */
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
index c7b6dbf7086..7c1f23383d3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hbase.nio;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
 import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetectorFactory;
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakTracker;
 
 /**
  * Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
@@ -31,7 +35,10 @@ import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
 @InterfaceAudience.Private
 public class RefCnt extends AbstractReferenceCounted {
 
-  private Recycler recycler = ByteBuffAllocator.NONE;
+  private static final ResourceLeakDetector<RefCnt> detector =
+    ResourceLeakDetectorFactory.instance().newResourceLeakDetector(RefCnt.class);
+  private final Recycler recycler;
+  private final ResourceLeakTracker<RefCnt> leak;
 
   /**
    * Create an {@link RefCnt} with an initial reference count = 1. If the reference count become
@@ -49,15 +56,66 @@ public class RefCnt extends AbstractReferenceCounted {
 
   public RefCnt(Recycler recycler) {
     this.recycler = recycler;
+    this.leak = recycler == ByteBuffAllocator.NONE ? null : detector.track(this);
+  }
+
+  @Override
+  public ReferenceCounted retain() {
+    maybeRecord();
+    return super.retain();
+  }
+
+  @Override
+  public ReferenceCounted retain(int increment) {
+    maybeRecord();
+    return super.retain(increment);
+  }
+
+  @Override
+  public boolean release() {
+    maybeRecord();
+    return super.release();
+  }
+
+  @Override
+  public boolean release(int decrement) {
+    maybeRecord();
+    return super.release(decrement);
   }
 
   @Override
   protected final void deallocate() {
     this.recycler.free();
+    if (leak != null) {
+      this.leak.close(this);
+    }
+  }
+
+  @Override
+  public RefCnt touch() {
+    maybeRecord();
+    return this;
   }
 
   @Override
   public final ReferenceCounted touch(Object hint) {
-    throw new UnsupportedOperationException();
+    maybeRecord(hint);
+    return this;
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  public Recycler getRecycler() {
+    return recycler;
+  }
+
+  private void maybeRecord() {
+    maybeRecord(null);
+  }
+
+  private void maybeRecord(Object hint) {
+    if (leak != null) {
+      leak.record(hint);
+    }
   }
 }
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
index 7cfdcd659d1..d77dc6604be 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
 import static org.apache.hadoop.hbase.io.ByteBuffAllocator.getHeapAllocationRatio;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -45,6 +46,21 @@ public class TestByteBuffAllocator {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestByteBuffAllocator.class);
 
+  @Test
+  public void testRecycleOnlyPooledBuffers() {
+    int maxBuffersInPool = 10;
+    int bufSize = 1024;
+    int minSize = bufSize / 8;
+    ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
+
+    ByteBuff buff = alloc.allocate(minSize - 1);
+    assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
+
+    alloc = new ByteBuffAllocator(true, 0, bufSize, minSize);
+    buff = alloc.allocate(minSize * 2);
+    assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
+  }
+
   @Test
   public void testAllocateByteBuffToReadInto() {
     int maxBuffersInPool = 10;
@@ -329,8 +345,6 @@ public class TestByteBuffAllocator {
     ByteBuff buf = alloc.allocate(bufSize);
     assertException(() -> buf.retain(2));
     assertException(() -> buf.release(2));
-    assertException(() -> buf.touch());
-    assertException(() -> buf.touch(new Object()));
   }
 
   private void assertException(Runnable r) {
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocatorLeakDetection.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocatorLeakDetection.java
new file mode 100644
index 00000000000..ffc0292902e
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocatorLeakDetection.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
+import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogLevel;
+import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogger;
+import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLoggerFactory;
+import org.apache.hbase.thirdparty.io.netty.util.internal.logging.Slf4JLoggerFactory;
+
+@Category({ RPCTests.class, SmallTests.class })
+public class TestByteBuffAllocatorLeakDetection {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestByteBuffAllocatorLeakDetection.class);
+
+  @SuppressWarnings("unused")
+  @Test
+  public void testLeakDetection() throws InterruptedException {
+    InternalLoggerFactory original = InternalLoggerFactory.getDefaultFactory();
+    AtomicInteger leaksDetected = new AtomicInteger();
+    InternalLoggerFactory.setDefaultFactory(new MockedLoggerFactory(leaksDetected));
+
+    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+    assertTrue(ResourceLeakDetector.isEnabled());
+
+    try {
+      int maxBuffersInPool = 10;
+      int bufSize = 1024;
+      int minSize = bufSize / 8;
+      ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
+
+      // tracking leaks happens on creation of a RefCnt, through a call to detector.track().
+      // If a leak occurs, but detector.track() is never called again, the leak will not be
+      // realized. Further, causing a leak requires a GC event. So below we do some allocations,
+      // cause some GC's, do more allocations, and then expect a leak to show up.
+
+      // first allocate on-heap. we expect to not see a leak from this, because we
+      // dont count on-heap references
+      alloc.allocate(minSize - 1);
+      System.gc();
+      Thread.sleep(1000);
+
+      // cause an allocation to trigger a leak detection, if there were one.
+      // keep a reference so we don't trigger a leak right away from this.
+      ByteBuff reference = alloc.allocate(minSize * 2);
+      assertEquals(0, leaksDetected.get());
+
+      // allocate, but don't keep a reference. this should cause a leak
+      alloc.allocate(minSize * 2);
+      System.gc();
+      Thread.sleep(1000);
+
+      // allocate again, this should cause the above leak to be detected
+      alloc.allocate(minSize * 2);
+      assertEquals(1, leaksDetected.get());
+    } finally {
+      InternalLoggerFactory.setDefaultFactory(original);
+    }
+  }
+
+  private static class MockedLoggerFactory extends Slf4JLoggerFactory {
+
+    private AtomicInteger leaksDetected;
+
+    public MockedLoggerFactory(AtomicInteger leaksDetected) {
+      this.leaksDetected = leaksDetected;
+    }
+
+    @Override
+    public InternalLogger newInstance(String name) {
+      InternalLogger delegate = super.newInstance(name);
+      return new MockedLogger(leaksDetected, delegate);
+    }
+  }
+
+  private static class MockedLogger implements InternalLogger {
+
+    private AtomicInteger leaksDetected;
+    private InternalLogger delegate;
+
+    public MockedLogger(AtomicInteger leaksDetected, InternalLogger delegate) {
+      this.leaksDetected = leaksDetected;
+      this.delegate = delegate;
+    }
+
+    private void maybeCountLeak(String msgOrFormat) {
+      if (msgOrFormat.startsWith("LEAK")) {
+        leaksDetected.incrementAndGet();
+      }
+    }
+
+    @Override
+    public void error(String msg) {
+      maybeCountLeak(msg);
+      delegate.error(msg);
+    }
+
+    @Override
+    public void error(String format, Object arg) {
+      maybeCountLeak(format);
+      delegate.error(format, arg);
+    }
+
+    @Override
+    public void error(String format, Object argA, Object argB) {
+      maybeCountLeak(format);
+      delegate.error(format, argA, argB);
+    }
+
+    @Override
+    public void error(String format, Object... arguments) {
+      maybeCountLeak(format);
+      delegate.error(format, arguments);
+    }
+
+    @Override
+    public void error(String msg, Throwable t) {
+      maybeCountLeak(msg);
+      delegate.error(msg, t);
+    }
+
+    @Override
+    public void error(Throwable t) {
+      delegate.error(t);
+    }
+
+    @Override
+    public String name() {
+      return delegate.name();
+    }
+
+    @Override
+    public boolean isTraceEnabled() {
+      return delegate.isTraceEnabled();
+    }
+
+    @Override
+    public void trace(String msg) {
+      delegate.trace(msg);
+    }
+
+    @Override
+    public void trace(String format, Object arg) {
+      delegate.trace(format, arg);
+    }
+
+    @Override
+    public void trace(String format, Object argA, Object argB) {
+      delegate.trace(format, argA, argB);
+    }
+
+    @Override
+    public void trace(String format, Object... arguments) {
+      delegate.trace(format, arguments);
+    }
+
+    @Override
+    public void trace(String msg, Throwable t) {
+      delegate.trace(msg, t);
+    }
+
+    @Override
+    public void trace(Throwable t) {
+      delegate.trace(t);
+    }
+
+    @Override
+    public boolean isDebugEnabled() {
+      return delegate.isDebugEnabled();
+    }
+
+    @Override
+    public void debug(String msg) {
+      delegate.debug(msg);
+    }
+
+    @Override
+    public void debug(String format, Object arg) {
+      delegate.debug(format, arg);
+    }
+
+    @Override
+    public void debug(String format, Object argA, Object argB) {
+      delegate.debug(format, argA, argB);
+    }
+
+    @Override
+    public void debug(String format, Object... arguments) {
+      delegate.debug(format, arguments);
+    }
+
+    @Override
+    public void debug(String msg, Throwable t) {
+      delegate.debug(msg, t);
+    }
+
+    @Override
+    public void debug(Throwable t) {
+      delegate.debug(t);
+    }
+
+    @Override
+    public boolean isInfoEnabled() {
+      return delegate.isInfoEnabled();
+    }
+
+    @Override
+    public void info(String msg) {
+      delegate.info(msg);
+    }
+
+    @Override
+    public void info(String format, Object arg) {
+      delegate.info(format, arg);
+    }
+
+    @Override
+    public void info(String format, Object argA, Object argB) {
+      delegate.info(format, argA, argB);
+    }
+
+    @Override
+    public void info(String format, Object... arguments) {
+      delegate.info(format, arguments);
+    }
+
+    @Override
+    public void info(String msg, Throwable t) {
+      delegate.info(msg, t);
+    }
+
+    @Override
+    public void info(Throwable t) {
+      delegate.info(t);
+    }
+
+    @Override
+    public boolean isWarnEnabled() {
+      return delegate.isWarnEnabled();
+    }
+
+    @Override
+    public void warn(String msg) {
+      delegate.warn(msg);
+    }
+
+    @Override
+    public void warn(String format, Object arg) {
+      delegate.warn(format, arg);
+    }
+
+    @Override
+    public void warn(String format, Object... arguments) {
+      delegate.warn(format, arguments);
+    }
+
+    @Override
+    public void warn(String format, Object argA, Object argB) {
+      delegate.warn(format, argA, argB);
+    }
+
+    @Override
+    public void warn(String msg, Throwable t) {
+      delegate.warn(msg, t);
+    }
+
+    @Override
+    public void warn(Throwable t) {
+      delegate.warn(t);
+    }
+
+    @Override
+    public boolean isErrorEnabled() {
+      return delegate.isErrorEnabled();
+    }
+
+    @Override
+    public boolean isEnabled(InternalLogLevel level) {
+      return delegate.isEnabled(level);
+    }
+
+    @Override
+    public void log(InternalLogLevel level, String msg) {
+      delegate.log(level, msg);
+    }
+
+    @Override
+    public void log(InternalLogLevel level, String format, Object arg) {
+      delegate.log(level, format, arg);
+    }
+
+    @Override
+    public void log(InternalLogLevel level, String format, Object argA, Object argB) {
+      delegate.log(level, format, argA, argB);
+    }
+
+    @Override
+    public void log(InternalLogLevel level, String format, Object... arguments) {
+      delegate.log(level, format, arguments);
+    }
+
+    @Override
+    public void log(InternalLogLevel level, String msg, Throwable t) {
+      delegate.log(level, msg, t);
+    }
+
+    @Override
+    public void log(InternalLogLevel level, Throwable t) {
+      delegate.log(level, t);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index d054e83ac19..0da331897dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -418,6 +418,22 @@ public class HFileBlock implements Cacheable {
     return buf.release();
   }
 
+  /**
+   * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose
+   * potential buffer leaks. We pass the block itself as a default hint, but one can use
+   * {@link #touch(Object)} to pass their own hint as well.
+   */
+  @Override
+  public HFileBlock touch() {
+    return touch(this);
+  }
+
+  @Override
+  public HFileBlock touch(Object hint) {
+    buf.touch(hint);
+    return this;
+  }
+
   /** @return get data block encoding id that was used to encode this block */
   short getDataBlockEncodingId() {
     if (blockType != BlockType.ENCODED_DATA) {
@@ -616,8 +632,9 @@ public class HFileBlock implements Cacheable {
       return this;
     }
 
-    HFileBlock unpacked = shallowClone(this);
-    unpacked.allocateBuffer(); // allocates space for the decompressed block
+    ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block
+    HFileBlock unpacked = shallowClone(this, newBuf);
+
     boolean succ = false;
     try {
       HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
@@ -643,20 +660,21 @@ public class HFileBlock implements Cacheable {
    * Always allocates a new buffer of the correct size. Copies header bytes from the existing
    * buffer. Does not change header fields. Reserve room to keep checksum bytes too.
    */
-  private void allocateBuffer() {
+  private ByteBuff allocateBufferForUnpacking() {
     int cksumBytes = totalChecksumBytes();
     int headerSize = headerSize();
     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
 
+    ByteBuff source = buf.duplicate();
     ByteBuff newBuf = allocator.allocate(capacityNeeded);
 
     // Copy header bytes into newBuf.
-    buf.position(0);
-    newBuf.put(0, buf, 0, headerSize);
+    source.position(0);
+    newBuf.put(0, source, 0, headerSize);
 
-    buf = newBuf;
     // set limit to exclude next block's header
-    buf.limit(capacityNeeded);
+    newBuf.limit(capacityNeeded);
+    return newBuf;
   }
 
   /**
@@ -708,11 +726,6 @@ public class HFileBlock implements Cacheable {
    * by default.
    */
   public boolean isSharedMem() {
-    if (this instanceof SharedMemHFileBlock) {
-      return true;
-    } else if (this instanceof ExclusiveMemHFileBlock) {
-      return false;
-    }
     return true;
   }
 
@@ -1997,23 +2010,31 @@ public class HFileBlock implements Cacheable {
       + onDiskDataSizeWithHeader;
   }
 
-  private static HFileBlockBuilder createBuilder(HFileBlock blk) {
+  /**
+   * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be
+   * loaded with all of the original fields from blk, except now using the newBuff and setting
+   * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been
+   * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a
+   * {@link SharedMemHFileBlock}. Or vice versa.
+   * @param blk     the block to clone from
+   * @param newBuff the new buffer to use
+   */
+  private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) {
     return new HFileBlockBuilder().withBlockType(blk.blockType)
       .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
       .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
-      .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(blk.buf.duplicate()) // Duplicate the
-                                                                                  // buffer.
-      .withOffset(blk.offset).withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
+      .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset)
+      .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
       .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext)
-      .withByteBuffAllocator(blk.allocator).withShared(blk.isSharedMem());
+      .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray());
   }
 
-  static HFileBlock shallowClone(HFileBlock blk) {
-    return createBuilder(blk).build();
+  private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
+    return createBuilder(blk, newBuf).build();
   }
 
   static HFileBlock deepCloneOnHeap(HFileBlock blk) {
     ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
-    return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
+    return createBuilder(blk, deepCloned).build();
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java
new file mode 100644
index 00000000000..a8399fa6b5a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ IOTests.class, MediumTests.class })
+public class TestHFileBlockUnpack {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestHFileBlockUnpack.class);
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+  // repetition gives us some chance to get a good compression ratio
+  private static float CHANCE_TO_REPEAT = 0.6f;
+
+  private static final int MIN_ALLOCATION_SIZE = 10 * 1024;
+
+  ByteBuffAllocator allocator;
+
+  @Rule
+  public TestName name = new TestName();
+  private FileSystem fs;
+
+  @Before
+  public void setUp() throws Exception {
+    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
+    Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE);
+    allocator = ByteBuffAllocator.create(conf, true);
+
+  }
+
+  /**
+   * If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that
+   * block will be allocated to heap regardless of desire for off-heap. After de-compressing the
+   * block, the new size may now exceed the min allocation size. This test ensures that those
+   * de-compressed blocks, which will be allocated off-heap, are properly marked as
+   * {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170
+   */
+  @Test
+  public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    HFileContext meta =
+      new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false)
+        .withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
+
+    Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
+    int totalSize;
+    try (FSDataOutputStream os = fs.create(path)) {
+      HFileBlock.Writer hbw = new HFileBlock.Writer(conf, NoOpDataBlockEncoder.INSTANCE, meta);
+      hbw.startWriting(BlockType.DATA);
+      writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1);
+      hbw.writeHeaderAndData(os);
+      totalSize = hbw.getOnDiskSizeWithHeader();
+      assertTrue(
+        "expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE,
+        totalSize < MIN_ALLOCATION_SIZE);
+    }
+
+    try (FSDataInputStream is = fs.open(path)) {
+      meta =
+        new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ)
+          .withIncludesMvcc(false).withIncludesTags(false).build();
+      ReaderContext context =
+        new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
+          .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
+      HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, allocator, conf);
+      hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, conf);
+      hbr.setIncludesMemStoreTS(false);
+      HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false);
+      blockFromHFile.sanityCheck();
+      assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.isUnpacked());
+      assertFalse("expected hfile block to NOT use shared memory", blockFromHFile.isSharedMem());
+
+      assertTrue(
+        "expected generated block size " + blockFromHFile.getOnDiskSizeWithHeader()
+          + " to be less than " + MIN_ALLOCATION_SIZE,
+        blockFromHFile.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE);
+      assertTrue(
+        "expected generated block uncompressed size "
+          + blockFromHFile.getUncompressedSizeWithoutHeader() + " to be more than "
+          + MIN_ALLOCATION_SIZE,
+        blockFromHFile.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE);
+
+      HFileBlock blockUnpacked = blockFromHFile.unpack(meta, hbr);
+      assertTrue("expected unpacked block to be unpacked", blockUnpacked.isUnpacked());
+      assertTrue("expected unpacked block to use shared memory", blockUnpacked.isSharedMem());
+    }
+  }
+
+  static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException {
+    Random random = new Random(42);
+
+    byte[] family = new byte[] { 1 };
+    int rowKey = 0;
+    int qualifier = 0;
+    int value = 0;
+    long timestamp = 0;
+
+    int totalSize = 0;
+
+    // go until just up to the limit. compression should bring the total on-disk size under
+    while (totalSize < desiredSize) {
+      rowKey = maybeIncrement(random, rowKey);
+      qualifier = maybeIncrement(random, qualifier);
+      value = maybeIncrement(random, value);
+      timestamp = maybeIncrement(random, (int) timestamp);
+
+      KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier),
+        timestamp, Bytes.toBytes(value));
+      hbw.write(keyValue);
+      totalSize += keyValue.getLength();
+    }
+
+    return totalSize;
+  }
+
+  private static int maybeIncrement(Random random, int value) {
+    if (random.nextFloat() < CHANCE_TO_REPEAT) {
+      return value;
+    }
+    return value + 1;
+  }
+
+}