You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/06/24 02:31:07 UTC

[hbase] 11/23: HBASE-22122 Change to release mob hfile's block after rpc server shipped response to client

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit ca92378e4245937761ab31bbe823841ae4b52629
Author: huzheng <op...@gmail.com>
AuthorDate: Wed Apr 17 11:54:15 2019 +0800

    HBASE-22122 Change to release mob hfile's block after rpc server shipped response to client
---
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java |  28 ++---
 .../java/org/apache/hadoop/hbase/mob/MobCell.java  |  74 +++++++++++++
 .../java/org/apache/hadoop/hbase/mob/MobFile.java  |  21 ++--
 .../hadoop/hbase/regionserver/HMobStore.java       |  63 ++++++------
 .../hadoop/hbase/regionserver/MobStoreScanner.java |  41 +++++++-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   2 +-
 .../regionserver/ReversedMobStoreScanner.java      |  47 +++++++--
 .../hbase/regionserver/StoreFileScanner.java       |  12 ---
 .../apache/hadoop/hbase/mob/TestCachedMobFile.java |  19 ++--
 .../org/apache/hadoop/hbase/mob/TestMobFile.java   |  26 ++---
 .../hbase/mob/TestMobWithByteBuffAllocator.java    | 114 +++++++++++++++++++++
 .../hadoop/hbase/regionserver/TestHMobStore.java   |  19 ++--
 12 files changed, 347 insertions(+), 119 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 062bec6..ee1a53f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -244,19 +244,21 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
                 writer.append(c);
               } else {
                 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
-                // the mob cell from the mob file, and write it back to the store file.
-                Cell mobCell = mobStore.resolve(c, false);
-                if (mobCell.getValueLength() != 0) {
-                  // put the mob data back to the store file
-                  PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
-                  writer.append(mobCell);
-                  cellsCountCompactedFromMob++;
-                  cellsSizeCompactedFromMob += mobCell.getValueLength();
-                } else {
-                  // If the value of a file is empty, there might be issues when retrieving,
-                  // directly write the cell to the store file, and leave it to be handled by the
-                  // next compaction.
-                  writer.append(c);
+                // the mob cell from the mob file, and write it back to the store file. Must
+                // close the mob scanner once the life cycle finished.
+                try (MobCell mobCell = mobStore.resolve(c, false)) {
+                  if (mobCell.getCell().getValueLength() != 0) {
+                    // put the mob data back to the store file
+                    PrivateCellUtil.setSequenceId(mobCell.getCell(), c.getSequenceId());
+                    writer.append(mobCell.getCell());
+                    cellsCountCompactedFromMob++;
+                    cellsSizeCompactedFromMob += mobCell.getCell().getValueLength();
+                  } else {
+                    // If the value of a file is empty, there might be issues when retrieving,
+                    // directly write the cell to the store file, and leave it to be handled by the
+                    // next compaction.
+                    writer.append(c);
+                  }
                 }
               }
             } else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java
new file mode 100644
index 0000000..ec956a2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mob;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The MobCell will maintain a {@link Cell} and a {@link StoreFileScanner} inside. Now, the mob cell
+ * is backend by NIO ByteBuffers which are allocated from ByteBuffAllocator, so we cannot just read
+ * the cell and close the MOB file scanner because the MOB file scanner closing will deallocate the
+ * NIO ByteBuffers, which resulting memory leak.
+ * <p>
+ * Actually, the right solution is: <br>
+ * 1. Read the normal cell; <br>
+ * 2. Parse the value of normal cell and get MOB fileName,offset,length; <br>
+ * 3. Open scanner to read the mob value; <br>
+ * 4. Construct the response cell whose key is from the normal cell and value is from the mob cell.
+ * <br>
+ * 5. Ship the response cell to HBase client. <br>
+ * 6. Release both normal cell's block and mob cell's block. <br>
+ * <p>
+ * For mob cell, the block releasing just means closing the the mob scanner, so here we need to keep
+ * the {@link StoreFileScanner} inside and close only when we're ensure that the MobCell has been
+ * shipped to RPC client.
+ */
+@InterfaceAudience.Private
+public class MobCell implements Closeable {
+
+  private final Cell cell;
+  private final StoreFileScanner sfScanner;
+
+  public MobCell(Cell cell) {
+    this.cell = cell;
+    this.sfScanner = null;
+  }
+
+  public MobCell(Cell cell, StoreFileScanner sfScanner) {
+    this.cell = cell;
+    this.sfScanner = sfScanner;
+  }
+
+  public Cell getCell() {
+    return cell;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.sfScanner != null) {
+      this.sfScanner.close();
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
index 1d0d5ff..43abd39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mob;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -70,7 +71,7 @@ public class MobFile {
    * @return The cell in the mob file.
    * @throws IOException
    */
-  public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
+  public MobCell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
     return readCell(search, cacheMobBlocks, sf.getMaxMemStoreTS());
   }
 
@@ -82,26 +83,26 @@ public class MobFile {
    * @return The cell in the mob file.
    * @throws IOException
    */
-  public Cell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException {
-    Cell result = null;
+  public MobCell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException {
     StoreFileScanner scanner = null;
-    List<HStoreFile> sfs = new ArrayList<>();
-    sfs.add(sf);
+    boolean succ = false;
     try {
-      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs,
-        cacheMobBlocks, true, false, false, readPt);
+      List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(
+        Collections.singletonList(sf), cacheMobBlocks, true, false, false, readPt);
       if (!sfScanners.isEmpty()) {
         scanner = sfScanners.get(0);
         if (scanner.seek(search)) {
-          result = scanner.peek();
+          MobCell mobCell = new MobCell(scanner.peek(), scanner);
+          succ = true;
+          return mobCell;
         }
       }
+      return null;
     } finally {
-      if (scanner != null) {
+      if (scanner != null && !succ) {
         scanner.close();
       }
     }
-    return result;
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 596aa3d..b8ea960 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
@@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
+import org.apache.hadoop.hbase.mob.MobCell;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobFile;
 import org.apache.hadoop.hbase.mob.MobFileCache;
@@ -298,14 +298,14 @@ public class HMobStore extends HStore {
   }
 
   /**
-   * Reads the cell from the mob file, and the read point does not count.
-   * This is used for DefaultMobStoreCompactor where we can read empty value for the missing cell.
+   * Reads the cell from the mob file, and the read point does not count. This is used for
+   * DefaultMobStoreCompactor where we can read empty value for the missing cell.
    * @param reference The cell found in the HBase, its value is a path to a mob file.
    * @param cacheBlocks Whether the scanner should cache blocks.
    * @return The cell found in the mob file.
    * @throws IOException
    */
-  public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
+  public MobCell resolve(Cell reference, boolean cacheBlocks) throws IOException {
     return resolve(reference, cacheBlocks, -1, true);
   }
 
@@ -314,14 +314,14 @@ public class HMobStore extends HStore {
    * @param reference The cell found in the HBase, its value is a path to a mob file.
    * @param cacheBlocks Whether the scanner should cache blocks.
    * @param readPt the read point.
-   * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is
-   *        missing or corrupt.
+   * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is missing or
+   *          corrupt.
    * @return The cell found in the mob file.
    * @throws IOException
    */
-  public Cell resolve(Cell reference, boolean cacheBlocks, long readPt,
-    boolean readEmptyValueOnMobCellMiss) throws IOException {
-    Cell result = null;
+  public MobCell resolve(Cell reference, boolean cacheBlocks, long readPt,
+      boolean readEmptyValueOnMobCellMiss) throws IOException {
+    MobCell mobCell = null;
     if (MobUtils.hasValidMobRefCellValue(reference)) {
       String fileName = MobUtils.getMobFileName(reference);
       Tag tableNameTag = MobUtils.getTableNameTag(reference);
@@ -336,35 +336,34 @@ public class HMobStore extends HStore {
               locations = new ArrayList<>(2);
               TableName tn = TableName.valueOf(tableNameString);
               locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
-              locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
-                  .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
+              locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
+                MobUtils.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
               map.put(tableNameString, locations);
             }
           } finally {
             keyLock.releaseLockEntry(lockEntry);
           }
         }
-        result = readCell(locations, fileName, reference, cacheBlocks, readPt,
+        mobCell = readCell(locations, fileName, reference, cacheBlocks, readPt,
           readEmptyValueOnMobCellMiss);
       }
     }
-    if (result == null) {
+    if (mobCell == null) {
       LOG.warn("The Cell result is null, assemble a new Cell with the same row,family,"
           + "qualifier,timestamp,type and tags but with an empty value to return.");
-      result = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
-              .setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength())
-              .setFamily(reference.getFamilyArray(), reference.getFamilyOffset(),
-                reference.getFamilyLength())
-              .setQualifier(reference.getQualifierArray(),
-                reference.getQualifierOffset(), reference.getQualifierLength())
-              .setTimestamp(reference.getTimestamp())
-              .setType(reference.getTypeByte())
-              .setValue(HConstants.EMPTY_BYTE_ARRAY)
-              .setTags(reference.getTagsArray(), reference.getTagsOffset(),
-                reference.getTagsLength())
-              .build();
+      Cell cell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
+          .setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength())
+          .setFamily(reference.getFamilyArray(), reference.getFamilyOffset(),
+            reference.getFamilyLength())
+          .setQualifier(reference.getQualifierArray(), reference.getQualifierOffset(),
+            reference.getQualifierLength())
+          .setTimestamp(reference.getTimestamp()).setType(reference.getTypeByte())
+          .setValue(HConstants.EMPTY_BYTE_ARRAY)
+          .setTags(reference.getTagsArray(), reference.getTagsOffset(), reference.getTagsLength())
+          .build();
+      mobCell = new MobCell(cell);
     }
-    return result;
+    return mobCell;
   }
 
   /**
@@ -383,8 +382,8 @@ public class HMobStore extends HStore {
    * @return The found cell. Null if there's no such a cell.
    * @throws IOException
    */
-  private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks,
-    long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
+  private MobCell readCell(List<Path> locations, String fileName, Cell search,
+      boolean cacheMobBlocks, long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
     FileSystem fs = getFileSystem();
     Throwable throwable = null;
     for (Path location : locations) {
@@ -392,12 +391,8 @@ public class HMobStore extends HStore {
       Path path = new Path(location, fileName);
       try {
         file = mobFileCache.openFile(fs, path, cacheConf);
-        Cell cell = readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt)
+        return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt)
             : file.readCell(search, cacheMobBlocks);
-        // Now we will return blocks to allocator for mob cells before shipping to rpc client.
-        // it will be memory leak. so just copy cell as an on-heap KV here. will remove this in
-        // HBASE-22122 (TODO)
-        return KeyValueUtil.copyToNewKeyValue(cell);
       } catch (IOException e) {
         mobFileCache.evictFile(fileName);
         throwable = e;
@@ -425,7 +420,7 @@ public class HMobStore extends HStore {
       }
     }
     LOG.error("The mob file " + fileName + " could not be found in the locations " + locations
-      + " or it is corrupt");
+        + " or it is corrupt");
     if (readEmptyValueOnMobCellMiss) {
       return null;
     } else if ((throwable instanceof FileNotFoundException)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
index b9f9af8..76144f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
@@ -19,13 +19,17 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobCell;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into
@@ -34,10 +38,13 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class MobStoreScanner extends StoreScanner {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MobStoreScanner.class);
+
   private boolean cacheMobBlocks = false;
   private boolean rawMobScan = false;
   private boolean readEmptyValueOnMobCellMiss = false;
   private final HMobStore mobStore;
+  private final List<MobCell> referencedMobCells;
 
   public MobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
       final NavigableSet<byte[]> columns, long readPt) throws IOException {
@@ -49,6 +56,7 @@ public class MobStoreScanner extends StoreScanner {
       throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
     }
     mobStore = (HMobStore) store;
+    this.referencedMobCells = new ArrayList<>();
   }
 
   /**
@@ -69,11 +77,13 @@ public class MobStoreScanner extends StoreScanner {
       for (int i = 0; i < outResult.size(); i++) {
         Cell cell = outResult.get(i);
         if (MobUtils.isMobReferenceCell(cell)) {
-          Cell mobCell = mobStore
-            .resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
+          MobCell mobCell =
+              mobStore.resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
           mobKVCount++;
-          mobKVSize += mobCell.getValueLength();
-          outResult.set(i, mobCell);
+          mobKVSize += mobCell.getCell().getValueLength();
+          outResult.set(i, mobCell.getCell());
+          // Keep the MobCell here unless we shipped the RPC or close the scanner.
+          referencedMobCells.add(mobCell);
         }
       }
       mobStore.updateMobScanCellsCount(mobKVCount);
@@ -81,4 +91,27 @@ public class MobStoreScanner extends StoreScanner {
     }
     return result;
   }
+
+  private void freeAllReferencedMobCells() throws IOException {
+    for (MobCell cell : referencedMobCells) {
+      cell.close();
+    }
+    referencedMobCells.clear();
+  }
+
+  @Override
+  public void shipped() throws IOException {
+    super.shipped();
+    this.freeAllReferencedMobCells();
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    try {
+      this.freeAllReferencedMobCells();
+    } catch (IOException e) {
+      LOG.warn("Failed to free referenced mob cells: ", e);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 1586f1c..2b8bba0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -291,7 +291,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
 
-  protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
+  public static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
 
   // Request counter. (Includes requests that are not serviced by regions.)
   // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
index d64c372..a3d779c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
@@ -19,26 +19,31 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobCell;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
- * reversed scanning in both the memstore and the MOB store.
- *
+ * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support reversed
+ * scanning in both the memstore and the MOB store.
  */
 @InterfaceAudience.Private
 public class ReversedMobStoreScanner extends ReversedStoreScanner {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ReversedMobStoreScanner.class);
   private boolean cacheMobBlocks = false;
   private boolean rawMobScan = false;
   private boolean readEmptyValueOnMobCellMiss = false;
-  protected final HMobStore mobStore;
+  private final HMobStore mobStore;
+  private final List<MobCell> referencedMobCells;
 
   ReversedMobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
       long readPt) throws IOException {
@@ -50,6 +55,7 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
       throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
     }
     mobStore = (HMobStore) store;
+    this.referencedMobCells = new ArrayList<>();
   }
 
   /**
@@ -70,11 +76,13 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
       for (int i = 0; i < outResult.size(); i++) {
         Cell cell = outResult.get(i);
         if (MobUtils.isMobReferenceCell(cell)) {
-          Cell mobCell = mobStore
-            .resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
+          MobCell mobCell =
+              mobStore.resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
           mobKVCount++;
-          mobKVSize += mobCell.getValueLength();
-          outResult.set(i, mobCell);
+          mobKVSize += mobCell.getCell().getValueLength();
+          outResult.set(i, mobCell.getCell());
+          // Keep the MobCell here unless we shipped the RPC or close the scanner.
+          referencedMobCells.add(mobCell);
         }
       }
       mobStore.updateMobScanCellsCount(mobKVCount);
@@ -82,4 +90,27 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
     }
     return result;
   }
+
+  private void freeAllReferencedMobCells() throws IOException {
+    for (MobCell mobCell : referencedMobCells) {
+      mobCell.close();
+    }
+    referencedMobCells.clear();
+  }
+
+  @Override
+  public void shipped() throws IOException {
+    super.shipped();
+    this.freeAllReferencedMobCells();
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    try {
+      this.freeAllReferencedMobCells();
+    } catch (IOException e) {
+      LOG.warn("Failed to free referenced mob cells: ", e);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index b5b853a..6e70c5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -97,18 +97,6 @@ public class StoreFileScanner implements KeyValueScanner {
     this.reader.incrementRefCount();
   }
 
-  boolean isPrimaryReplica() {
-    return reader.isPrimaryReplicaReader();
-  }
-
-  /**
-   * Return an array of scanners corresponding to the given set of store files.
-   */
-  public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files,
-      boolean cacheBlocks, boolean usePread, long readPt) throws IOException {
-    return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt);
-  }
-
   /**
    * Return an array of scanners corresponding to the given set of store files.
    */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
index bb194b6..d274db3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
@@ -113,46 +113,45 @@ public class TestCachedMobFile {
     Path testDir = TEST_UTIL.getDataTestDir();
     FileSystem fs = testDir.getFileSystem(conf);
     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
-    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
-        .withOutputDir(testDir).withFileContext(meta).build();
+    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(testDir)
+        .withFileContext(meta).build();
     String caseName = testName.getMethodName();
     MobTestUtil.writeStoreFile(writer, caseName);
     CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
     byte[] family = Bytes.toBytes(caseName);
     byte[] qualify = Bytes.toBytes(caseName);
     // Test the start key
-    byte[] startKey = Bytes.toBytes("aa");  // The start key bytes
+    byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
     KeyValue expectedKey =
         new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
     KeyValue seekKey = expectedKey.createKeyOnly(false);
-    Cell cell = cachedMobFile.readCell(seekKey, false);
+    Cell cell = cachedMobFile.readCell(seekKey, false).getCell();
     MobTestUtil.assertCellEquals(expectedKey, cell);
 
     // Test the end key
-    byte[] endKey = Bytes.toBytes("zz");  // The end key bytes
+    byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
     expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
     seekKey = expectedKey.createKeyOnly(false);
-    cell = cachedMobFile.readCell(seekKey, false);
+    cell = cachedMobFile.readCell(seekKey, false).getCell();
     MobTestUtil.assertCellEquals(expectedKey, cell);
 
     // Test the random key
     byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
     expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
     seekKey = expectedKey.createKeyOnly(false);
-    cell = cachedMobFile.readCell(seekKey, false);
+    cell = cachedMobFile.readCell(seekKey, false).getCell();
     MobTestUtil.assertCellEquals(expectedKey, cell);
 
     // Test the key which is less than the start key
     byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
     expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
     seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
-    cell = cachedMobFile.readCell(seekKey, false);
+    cell = cachedMobFile.readCell(seekKey, false).getCell();
     MobTestUtil.assertCellEquals(expectedKey, cell);
 
     // Test the key which is more than the end key
     byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
     seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
-    cell = cachedMobFile.readCell(seekKey, false);
-    Assert.assertNull(cell);
+    Assert.assertNull(cachedMobFile.readCell(seekKey, false));
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
index c22ca98..297c19f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
@@ -43,8 +43,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Category(SmallTests.class)
 public class TestMobFile {
@@ -53,7 +51,6 @@ public class TestMobFile {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestMobFile.class);
 
-  static final Logger LOG = LoggerFactory.getLogger(TestMobFile.class);
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private Configuration conf = TEST_UTIL.getConfiguration();
   private CacheConfig cacheConf =  new CacheConfig(conf);
@@ -64,11 +61,9 @@ public class TestMobFile {
   public void testReadKeyValue() throws Exception {
     Path testDir = TEST_UTIL.getDataTestDir();
     FileSystem fs = testDir.getFileSystem(conf);
-    HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
-    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
-            .withOutputDir(testDir)
-            .withFileContext(meta)
-            .build();
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(testDir)
+        .withFileContext(meta).build();
     String caseName = testName.getMethodName();
     MobTestUtil.writeStoreFile(writer, caseName);
 
@@ -78,39 +73,38 @@ public class TestMobFile {
     byte[] qualify = Bytes.toBytes(caseName);
 
     // Test the start key
-    byte[] startKey = Bytes.toBytes("aa");  // The start key bytes
+    byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
     KeyValue expectedKey =
         new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
     KeyValue seekKey = expectedKey.createKeyOnly(false);
-    Cell cell = mobFile.readCell(seekKey, false);
+    Cell cell = mobFile.readCell(seekKey, false).getCell();
     MobTestUtil.assertCellEquals(expectedKey, cell);
 
     // Test the end key
-    byte[] endKey = Bytes.toBytes("zz");  // The end key bytes
+    byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
     expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
     seekKey = expectedKey.createKeyOnly(false);
-    cell = mobFile.readCell(seekKey, false);
+    cell = mobFile.readCell(seekKey, false).getCell();
     MobTestUtil.assertCellEquals(expectedKey, cell);
 
     // Test the random key
     byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
     expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
     seekKey = expectedKey.createKeyOnly(false);
-    cell = mobFile.readCell(seekKey, false);
+    cell = mobFile.readCell(seekKey, false).getCell();
     MobTestUtil.assertCellEquals(expectedKey, cell);
 
     // Test the key which is less than the start key
     byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
     expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
     seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
-    cell = mobFile.readCell(seekKey, false);
+    cell = mobFile.readCell(seekKey, false).getCell();
     MobTestUtil.assertCellEquals(expectedKey, cell);
 
     // Test the key which is more than the end key
     byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
     seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
-    cell = mobFile.readCell(seekKey, false);
-    assertNull(cell);
+    assertNull(mobFile.readCell(seekKey, false));
   }
 
   @Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java
new file mode 100644
index 0000000..a527740
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the MOB feature when enable RPC ByteBuffAllocator (HBASE-22122)
+ */
+@Category({ MediumTests.class })
+public class TestMobWithByteBuffAllocator {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMobWithByteBuffAllocator.class);
+
+  private static final String TABLE_NAME = "TestMobWithByteBuffAllocator";
+  private static final Logger LOG = LoggerFactory.getLogger(TestMobWithByteBuffAllocator.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final Configuration CONF = UTIL.getConfiguration();
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    // Must use the ByteBuffAllocator here
+    CONF.setBoolean(RSRpcServices.RESERVOIR_ENABLED_KEY, true);
+    // Must use OFF-HEAP BucketCache here.
+    CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.1f);
+    CONF.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
+    // 32MB for BucketCache.
+    CONF.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 32);
+    CONF.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testReadingCellsFromHFile() throws Exception {
+    TableName tableName = TableName.valueOf(TABLE_NAME);
+    MobSnapshotTestingUtils.createMobTable(UTIL, tableName, 1, FAMILY);
+    LOG.info("Create an mob table {} successfully.", tableName);
+
+    int expectedRows = 500;
+    SnapshotTestingUtils.loadData(UTIL, tableName, expectedRows, FAMILY);
+    LOG.info("Load 500 rows data into table {} successfully.", tableName);
+
+    // Flush all the data into HFiles.
+    try (Admin admin = UTIL.getConnection().getAdmin()) {
+      admin.flush(tableName);
+    }
+
+    // Scan the rows
+    MobSnapshotTestingUtils.verifyMobRowCount(UTIL, tableName, expectedRows);
+
+    // Reversed scan the rows
+    int rows = 0;
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      try (ResultScanner scanner = table.getScanner(new Scan().setReversed(true))) {
+        for (Result res; (res = scanner.next()) != null;) {
+          rows++;
+          for (Cell cell : res.listCells()) {
+            Assert.assertTrue(CellUtil.cloneValue(cell).length > 0);
+          }
+        }
+      }
+    }
+    Assert.assertEquals(expectedRows, rows);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index bf1f18e..152ea87 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -448,17 +448,14 @@ public class TestHMobStore {
     String targetPathName = MobUtils.formatDate(currentDate);
     Path targetPath = new Path(store.getPath(), targetPathName);
     store.commitFile(mobFilePath, targetPath);
-    //resolve
-    Cell resultCell1 = store.resolve(seekKey1, false);
-    Cell resultCell2 = store.resolve(seekKey2, false);
-    Cell resultCell3 = store.resolve(seekKey3, false);
-    //compare
-    Assert.assertEquals(Bytes.toString(value),
-        Bytes.toString(CellUtil.cloneValue(resultCell1)));
-    Assert.assertEquals(Bytes.toString(value),
-        Bytes.toString(CellUtil.cloneValue(resultCell2)));
-    Assert.assertEquals(Bytes.toString(value2),
-        Bytes.toString(CellUtil.cloneValue(resultCell3)));
+    // resolve
+    Cell resultCell1 = store.resolve(seekKey1, false).getCell();
+    Cell resultCell2 = store.resolve(seekKey2, false).getCell();
+    Cell resultCell3 = store.resolve(seekKey3, false).getCell();
+    // compare
+    Assert.assertEquals(Bytes.toString(value), Bytes.toString(CellUtil.cloneValue(resultCell1)));
+    Assert.assertEquals(Bytes.toString(value), Bytes.toString(CellUtil.cloneValue(resultCell2)));
+    Assert.assertEquals(Bytes.toString(value2), Bytes.toString(CellUtil.cloneValue(resultCell3)));
   }
 
   /**