You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/06/24 02:31:07 UTC
[hbase] 11/23: HBASE-22122 Change to release mob hfile's block
after rpc server shipped response to client
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit ca92378e4245937761ab31bbe823841ae4b52629
Author: huzheng <op...@gmail.com>
AuthorDate: Wed Apr 17 11:54:15 2019 +0800
HBASE-22122 Change to release mob hfile's block after rpc server shipped response to client
---
.../hadoop/hbase/mob/DefaultMobStoreCompactor.java | 28 ++---
.../java/org/apache/hadoop/hbase/mob/MobCell.java | 74 +++++++++++++
.../java/org/apache/hadoop/hbase/mob/MobFile.java | 21 ++--
.../hadoop/hbase/regionserver/HMobStore.java | 63 ++++++------
.../hadoop/hbase/regionserver/MobStoreScanner.java | 41 +++++++-
.../hadoop/hbase/regionserver/RSRpcServices.java | 2 +-
.../regionserver/ReversedMobStoreScanner.java | 47 +++++++--
.../hbase/regionserver/StoreFileScanner.java | 12 ---
.../apache/hadoop/hbase/mob/TestCachedMobFile.java | 19 ++--
.../org/apache/hadoop/hbase/mob/TestMobFile.java | 26 ++---
.../hbase/mob/TestMobWithByteBuffAllocator.java | 114 +++++++++++++++++++++
.../hadoop/hbase/regionserver/TestHMobStore.java | 19 ++--
12 files changed, 347 insertions(+), 119 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 062bec6..ee1a53f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -244,19 +244,21 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
writer.append(c);
} else {
// If the value is not larger than the threshold, it's not regarded a mob. Retrieve
- // the mob cell from the mob file, and write it back to the store file.
- Cell mobCell = mobStore.resolve(c, false);
- if (mobCell.getValueLength() != 0) {
- // put the mob data back to the store file
- PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
- writer.append(mobCell);
- cellsCountCompactedFromMob++;
- cellsSizeCompactedFromMob += mobCell.getValueLength();
- } else {
- // If the value of a file is empty, there might be issues when retrieving,
- // directly write the cell to the store file, and leave it to be handled by the
- // next compaction.
- writer.append(c);
+ // the mob cell from the mob file, and write it back to the store file. Must
+ // close the mob scanner once the life cycle finished.
+ try (MobCell mobCell = mobStore.resolve(c, false)) {
+ if (mobCell.getCell().getValueLength() != 0) {
+ // put the mob data back to the store file
+ PrivateCellUtil.setSequenceId(mobCell.getCell(), c.getSequenceId());
+ writer.append(mobCell.getCell());
+ cellsCountCompactedFromMob++;
+ cellsSizeCompactedFromMob += mobCell.getCell().getValueLength();
+ } else {
+ // If the value of a file is empty, there might be issues when retrieving,
+ // directly write the cell to the store file, and leave it to be handled by the
+ // next compaction.
+ writer.append(c);
+ }
}
}
} else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java
new file mode 100644
index 0000000..ec956a2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mob;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The MobCell will maintain a {@link Cell} and a {@link StoreFileScanner} inside. Now, the mob cell
+ * is backend by NIO ByteBuffers which are allocated from ByteBuffAllocator, so we cannot just read
+ * the cell and close the MOB file scanner because the MOB file scanner closing will deallocate the
+ * NIO ByteBuffers, which resulting memory leak.
+ * <p>
+ * Actually, the right solution is: <br>
+ * 1. Read the normal cell; <br>
+ * 2. Parse the value of normal cell and get MOB fileName,offset,length; <br>
+ * 3. Open scanner to read the mob value; <br>
+ * 4. Construct the response cell whose key is from the normal cell and value is from the mob cell.
+ * <br>
+ * 5. Ship the response cell to HBase client. <br>
+ * 6. Release both normal cell's block and mob cell's block. <br>
+ * <p>
+ * For mob cell, the block releasing just means closing the the mob scanner, so here we need to keep
+ * the {@link StoreFileScanner} inside and close only when we're ensure that the MobCell has been
+ * shipped to RPC client.
+ */
+@InterfaceAudience.Private
+public class MobCell implements Closeable {
+
+ private final Cell cell;
+ private final StoreFileScanner sfScanner;
+
+ public MobCell(Cell cell) {
+ this.cell = cell;
+ this.sfScanner = null;
+ }
+
+ public MobCell(Cell cell, StoreFileScanner sfScanner) {
+ this.cell = cell;
+ this.sfScanner = sfScanner;
+ }
+
+ public Cell getCell() {
+ return cell;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.sfScanner != null) {
+ this.sfScanner.close();
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
index 1d0d5ff..43abd39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -70,7 +71,7 @@ public class MobFile {
* @return The cell in the mob file.
* @throws IOException
*/
- public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
+ public MobCell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
return readCell(search, cacheMobBlocks, sf.getMaxMemStoreTS());
}
@@ -82,26 +83,26 @@ public class MobFile {
* @return The cell in the mob file.
* @throws IOException
*/
- public Cell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException {
- Cell result = null;
+ public MobCell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException {
StoreFileScanner scanner = null;
- List<HStoreFile> sfs = new ArrayList<>();
- sfs.add(sf);
+ boolean succ = false;
try {
- List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs,
- cacheMobBlocks, true, false, false, readPt);
+ List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(
+ Collections.singletonList(sf), cacheMobBlocks, true, false, false, readPt);
if (!sfScanners.isEmpty()) {
scanner = sfScanners.get(0);
if (scanner.seek(search)) {
- result = scanner.peek();
+ MobCell mobCell = new MobCell(scanner.peek(), scanner);
+ succ = true;
+ return mobCell;
}
}
+ return null;
} finally {
- if (scanner != null) {
+ if (scanner != null && !succ) {
scanner.close();
}
}
- return result;
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 596aa3d..b8ea960 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
@@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
+import org.apache.hadoop.hbase.mob.MobCell;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFile;
import org.apache.hadoop.hbase.mob.MobFileCache;
@@ -298,14 +298,14 @@ public class HMobStore extends HStore {
}
/**
- * Reads the cell from the mob file, and the read point does not count.
- * This is used for DefaultMobStoreCompactor where we can read empty value for the missing cell.
+ * Reads the cell from the mob file, and the read point does not count. This is used for
+ * DefaultMobStoreCompactor where we can read empty value for the missing cell.
* @param reference The cell found in the HBase, its value is a path to a mob file.
* @param cacheBlocks Whether the scanner should cache blocks.
* @return The cell found in the mob file.
* @throws IOException
*/
- public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
+ public MobCell resolve(Cell reference, boolean cacheBlocks) throws IOException {
return resolve(reference, cacheBlocks, -1, true);
}
@@ -314,14 +314,14 @@ public class HMobStore extends HStore {
* @param reference The cell found in the HBase, its value is a path to a mob file.
* @param cacheBlocks Whether the scanner should cache blocks.
* @param readPt the read point.
- * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is
- * missing or corrupt.
+ * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is missing or
+ * corrupt.
* @return The cell found in the mob file.
* @throws IOException
*/
- public Cell resolve(Cell reference, boolean cacheBlocks, long readPt,
- boolean readEmptyValueOnMobCellMiss) throws IOException {
- Cell result = null;
+ public MobCell resolve(Cell reference, boolean cacheBlocks, long readPt,
+ boolean readEmptyValueOnMobCellMiss) throws IOException {
+ MobCell mobCell = null;
if (MobUtils.hasValidMobRefCellValue(reference)) {
String fileName = MobUtils.getMobFileName(reference);
Tag tableNameTag = MobUtils.getTableNameTag(reference);
@@ -336,35 +336,34 @@ public class HMobStore extends HStore {
locations = new ArrayList<>(2);
TableName tn = TableName.valueOf(tableNameString);
locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
- locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
- .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
+ locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
+ MobUtils.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
map.put(tableNameString, locations);
}
} finally {
keyLock.releaseLockEntry(lockEntry);
}
}
- result = readCell(locations, fileName, reference, cacheBlocks, readPt,
+ mobCell = readCell(locations, fileName, reference, cacheBlocks, readPt,
readEmptyValueOnMobCellMiss);
}
}
- if (result == null) {
+ if (mobCell == null) {
LOG.warn("The Cell result is null, assemble a new Cell with the same row,family,"
+ "qualifier,timestamp,type and tags but with an empty value to return.");
- result = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
- .setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength())
- .setFamily(reference.getFamilyArray(), reference.getFamilyOffset(),
- reference.getFamilyLength())
- .setQualifier(reference.getQualifierArray(),
- reference.getQualifierOffset(), reference.getQualifierLength())
- .setTimestamp(reference.getTimestamp())
- .setType(reference.getTypeByte())
- .setValue(HConstants.EMPTY_BYTE_ARRAY)
- .setTags(reference.getTagsArray(), reference.getTagsOffset(),
- reference.getTagsLength())
- .build();
+ Cell cell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
+ .setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength())
+ .setFamily(reference.getFamilyArray(), reference.getFamilyOffset(),
+ reference.getFamilyLength())
+ .setQualifier(reference.getQualifierArray(), reference.getQualifierOffset(),
+ reference.getQualifierLength())
+ .setTimestamp(reference.getTimestamp()).setType(reference.getTypeByte())
+ .setValue(HConstants.EMPTY_BYTE_ARRAY)
+ .setTags(reference.getTagsArray(), reference.getTagsOffset(), reference.getTagsLength())
+ .build();
+ mobCell = new MobCell(cell);
}
- return result;
+ return mobCell;
}
/**
@@ -383,8 +382,8 @@ public class HMobStore extends HStore {
* @return The found cell. Null if there's no such a cell.
* @throws IOException
*/
- private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks,
- long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
+ private MobCell readCell(List<Path> locations, String fileName, Cell search,
+ boolean cacheMobBlocks, long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
FileSystem fs = getFileSystem();
Throwable throwable = null;
for (Path location : locations) {
@@ -392,12 +391,8 @@ public class HMobStore extends HStore {
Path path = new Path(location, fileName);
try {
file = mobFileCache.openFile(fs, path, cacheConf);
- Cell cell = readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt)
+ return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt)
: file.readCell(search, cacheMobBlocks);
- // Now we will return blocks to allocator for mob cells before shipping to rpc client.
- // it will be memory leak. so just copy cell as an on-heap KV here. will remove this in
- // HBASE-22122 (TODO)
- return KeyValueUtil.copyToNewKeyValue(cell);
} catch (IOException e) {
mobFileCache.evictFile(fileName);
throwable = e;
@@ -425,7 +420,7 @@ public class HMobStore extends HStore {
}
}
LOG.error("The mob file " + fileName + " could not be found in the locations " + locations
- + " or it is corrupt");
+ + " or it is corrupt");
if (readEmptyValueOnMobCellMiss) {
return null;
} else if ((throwable instanceof FileNotFoundException)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
index b9f9af8..76144f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
@@ -19,13 +19,17 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobCell;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into
@@ -34,10 +38,13 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class MobStoreScanner extends StoreScanner {
+ private static final Logger LOG = LoggerFactory.getLogger(MobStoreScanner.class);
+
private boolean cacheMobBlocks = false;
private boolean rawMobScan = false;
private boolean readEmptyValueOnMobCellMiss = false;
private final HMobStore mobStore;
+ private final List<MobCell> referencedMobCells;
public MobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
final NavigableSet<byte[]> columns, long readPt) throws IOException {
@@ -49,6 +56,7 @@ public class MobStoreScanner extends StoreScanner {
throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
}
mobStore = (HMobStore) store;
+ this.referencedMobCells = new ArrayList<>();
}
/**
@@ -69,11 +77,13 @@ public class MobStoreScanner extends StoreScanner {
for (int i = 0; i < outResult.size(); i++) {
Cell cell = outResult.get(i);
if (MobUtils.isMobReferenceCell(cell)) {
- Cell mobCell = mobStore
- .resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
+ MobCell mobCell =
+ mobStore.resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
mobKVCount++;
- mobKVSize += mobCell.getValueLength();
- outResult.set(i, mobCell);
+ mobKVSize += mobCell.getCell().getValueLength();
+ outResult.set(i, mobCell.getCell());
+ // Keep the MobCell here unless we shipped the RPC or close the scanner.
+ referencedMobCells.add(mobCell);
}
}
mobStore.updateMobScanCellsCount(mobKVCount);
@@ -81,4 +91,27 @@ public class MobStoreScanner extends StoreScanner {
}
return result;
}
+
+ private void freeAllReferencedMobCells() throws IOException {
+ for (MobCell cell : referencedMobCells) {
+ cell.close();
+ }
+ referencedMobCells.clear();
+ }
+
+ @Override
+ public void shipped() throws IOException {
+ super.shipped();
+ this.freeAllReferencedMobCells();
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ try {
+ this.freeAllReferencedMobCells();
+ } catch (IOException e) {
+ LOG.warn("Failed to free referenced mob cells: ", e);
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 1586f1c..2b8bba0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -291,7 +291,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
- protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
+ public static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
// Request counter. (Includes requests that are not serviced by regions.)
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
index d64c372..a3d779c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
@@ -19,26 +19,31 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobCell;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
- * reversed scanning in both the memstore and the MOB store.
- *
+ * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support reversed
+ * scanning in both the memstore and the MOB store.
*/
@InterfaceAudience.Private
public class ReversedMobStoreScanner extends ReversedStoreScanner {
+ private static final Logger LOG = LoggerFactory.getLogger(ReversedMobStoreScanner.class);
private boolean cacheMobBlocks = false;
private boolean rawMobScan = false;
private boolean readEmptyValueOnMobCellMiss = false;
- protected final HMobStore mobStore;
+ private final HMobStore mobStore;
+ private final List<MobCell> referencedMobCells;
ReversedMobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
@@ -50,6 +55,7 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
}
mobStore = (HMobStore) store;
+ this.referencedMobCells = new ArrayList<>();
}
/**
@@ -70,11 +76,13 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
for (int i = 0; i < outResult.size(); i++) {
Cell cell = outResult.get(i);
if (MobUtils.isMobReferenceCell(cell)) {
- Cell mobCell = mobStore
- .resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
+ MobCell mobCell =
+ mobStore.resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
mobKVCount++;
- mobKVSize += mobCell.getValueLength();
- outResult.set(i, mobCell);
+ mobKVSize += mobCell.getCell().getValueLength();
+ outResult.set(i, mobCell.getCell());
+ // Keep the MobCell here unless we shipped the RPC or close the scanner.
+ referencedMobCells.add(mobCell);
}
}
mobStore.updateMobScanCellsCount(mobKVCount);
@@ -82,4 +90,27 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
}
return result;
}
+
+ private void freeAllReferencedMobCells() throws IOException {
+ for (MobCell mobCell : referencedMobCells) {
+ mobCell.close();
+ }
+ referencedMobCells.clear();
+ }
+
+ @Override
+ public void shipped() throws IOException {
+ super.shipped();
+ this.freeAllReferencedMobCells();
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ try {
+ this.freeAllReferencedMobCells();
+ } catch (IOException e) {
+ LOG.warn("Failed to free referenced mob cells: ", e);
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index b5b853a..6e70c5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -97,18 +97,6 @@ public class StoreFileScanner implements KeyValueScanner {
this.reader.incrementRefCount();
}
- boolean isPrimaryReplica() {
- return reader.isPrimaryReplicaReader();
- }
-
- /**
- * Return an array of scanners corresponding to the given set of store files.
- */
- public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files,
- boolean cacheBlocks, boolean usePread, long readPt) throws IOException {
- return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt);
- }
-
/**
* Return an array of scanners corresponding to the given set of store files.
*/
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
index bb194b6..d274db3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
@@ -113,46 +113,45 @@ public class TestCachedMobFile {
Path testDir = TEST_UTIL.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
- StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
- .withOutputDir(testDir).withFileContext(meta).build();
+ StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(testDir)
+ .withFileContext(meta).build();
String caseName = testName.getMethodName();
MobTestUtil.writeStoreFile(writer, caseName);
CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
byte[] family = Bytes.toBytes(caseName);
byte[] qualify = Bytes.toBytes(caseName);
// Test the start key
- byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
+ byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
KeyValue expectedKey =
new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
KeyValue seekKey = expectedKey.createKeyOnly(false);
- Cell cell = cachedMobFile.readCell(seekKey, false);
+ Cell cell = cachedMobFile.readCell(seekKey, false).getCell();
MobTestUtil.assertCellEquals(expectedKey, cell);
// Test the end key
- byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
+ byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
seekKey = expectedKey.createKeyOnly(false);
- cell = cachedMobFile.readCell(seekKey, false);
+ cell = cachedMobFile.readCell(seekKey, false).getCell();
MobTestUtil.assertCellEquals(expectedKey, cell);
// Test the random key
byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
seekKey = expectedKey.createKeyOnly(false);
- cell = cachedMobFile.readCell(seekKey, false);
+ cell = cachedMobFile.readCell(seekKey, false).getCell();
MobTestUtil.assertCellEquals(expectedKey, cell);
// Test the key which is less than the start key
byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
- cell = cachedMobFile.readCell(seekKey, false);
+ cell = cachedMobFile.readCell(seekKey, false).getCell();
MobTestUtil.assertCellEquals(expectedKey, cell);
// Test the key which is more than the end key
byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
- cell = cachedMobFile.readCell(seekKey, false);
- Assert.assertNull(cell);
+ Assert.assertNull(cachedMobFile.readCell(seekKey, false));
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
index c22ca98..297c19f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
@@ -43,8 +43,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Category(SmallTests.class)
public class TestMobFile {
@@ -53,7 +51,6 @@ public class TestMobFile {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMobFile.class);
- static final Logger LOG = LoggerFactory.getLogger(TestMobFile.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private Configuration conf = TEST_UTIL.getConfiguration();
private CacheConfig cacheConf = new CacheConfig(conf);
@@ -64,11 +61,9 @@ public class TestMobFile {
public void testReadKeyValue() throws Exception {
Path testDir = TEST_UTIL.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
- HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
- StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
- .withOutputDir(testDir)
- .withFileContext(meta)
- .build();
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+ StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(testDir)
+ .withFileContext(meta).build();
String caseName = testName.getMethodName();
MobTestUtil.writeStoreFile(writer, caseName);
@@ -78,39 +73,38 @@ public class TestMobFile {
byte[] qualify = Bytes.toBytes(caseName);
// Test the start key
- byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
+ byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
KeyValue expectedKey =
new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
KeyValue seekKey = expectedKey.createKeyOnly(false);
- Cell cell = mobFile.readCell(seekKey, false);
+ Cell cell = mobFile.readCell(seekKey, false).getCell();
MobTestUtil.assertCellEquals(expectedKey, cell);
// Test the end key
- byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
+ byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
seekKey = expectedKey.createKeyOnly(false);
- cell = mobFile.readCell(seekKey, false);
+ cell = mobFile.readCell(seekKey, false).getCell();
MobTestUtil.assertCellEquals(expectedKey, cell);
// Test the random key
byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
seekKey = expectedKey.createKeyOnly(false);
- cell = mobFile.readCell(seekKey, false);
+ cell = mobFile.readCell(seekKey, false).getCell();
MobTestUtil.assertCellEquals(expectedKey, cell);
// Test the key which is less than the start key
byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
- cell = mobFile.readCell(seekKey, false);
+ cell = mobFile.readCell(seekKey, false).getCell();
MobTestUtil.assertCellEquals(expectedKey, cell);
// Test the key which is more than the end key
byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
- cell = mobFile.readCell(seekKey, false);
- assertNull(cell);
+ assertNull(mobFile.readCell(seekKey, false));
}
@Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java
new file mode 100644
index 0000000..a527740
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the MOB feature when enable RPC ByteBuffAllocator (HBASE-22122)
+ */
+@Category({ MediumTests.class })
+public class TestMobWithByteBuffAllocator {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMobWithByteBuffAllocator.class);
+
+ private static final String TABLE_NAME = "TestMobWithByteBuffAllocator";
+ private static final Logger LOG = LoggerFactory.getLogger(TestMobWithByteBuffAllocator.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final Configuration CONF = UTIL.getConfiguration();
+ private static final byte[] FAMILY = Bytes.toBytes("f");
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // Must use the ByteBuffAllocator here
+ CONF.setBoolean(RSRpcServices.RESERVOIR_ENABLED_KEY, true);
+ // Must use OFF-HEAP BucketCache here.
+ CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.1f);
+ CONF.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
+ // 32MB for BucketCache.
+ CONF.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 32);
+ CONF.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+ UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testReadingCellsFromHFile() throws Exception {
+ TableName tableName = TableName.valueOf(TABLE_NAME);
+ MobSnapshotTestingUtils.createMobTable(UTIL, tableName, 1, FAMILY);
+ LOG.info("Create an mob table {} successfully.", tableName);
+
+ int expectedRows = 500;
+ SnapshotTestingUtils.loadData(UTIL, tableName, expectedRows, FAMILY);
+ LOG.info("Load 500 rows data into table {} successfully.", tableName);
+
+ // Flush all the data into HFiles.
+ try (Admin admin = UTIL.getConnection().getAdmin()) {
+ admin.flush(tableName);
+ }
+
+ // Scan the rows
+ MobSnapshotTestingUtils.verifyMobRowCount(UTIL, tableName, expectedRows);
+
+ // Reversed scan the rows
+ int rows = 0;
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ try (ResultScanner scanner = table.getScanner(new Scan().setReversed(true))) {
+ for (Result res; (res = scanner.next()) != null;) {
+ rows++;
+ for (Cell cell : res.listCells()) {
+ Assert.assertTrue(CellUtil.cloneValue(cell).length > 0);
+ }
+ }
+ }
+ }
+ Assert.assertEquals(expectedRows, rows);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index bf1f18e..152ea87 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -448,17 +448,14 @@ public class TestHMobStore {
String targetPathName = MobUtils.formatDate(currentDate);
Path targetPath = new Path(store.getPath(), targetPathName);
store.commitFile(mobFilePath, targetPath);
- //resolve
- Cell resultCell1 = store.resolve(seekKey1, false);
- Cell resultCell2 = store.resolve(seekKey2, false);
- Cell resultCell3 = store.resolve(seekKey3, false);
- //compare
- Assert.assertEquals(Bytes.toString(value),
- Bytes.toString(CellUtil.cloneValue(resultCell1)));
- Assert.assertEquals(Bytes.toString(value),
- Bytes.toString(CellUtil.cloneValue(resultCell2)));
- Assert.assertEquals(Bytes.toString(value2),
- Bytes.toString(CellUtil.cloneValue(resultCell3)));
+ // resolve
+ Cell resultCell1 = store.resolve(seekKey1, false).getCell();
+ Cell resultCell2 = store.resolve(seekKey2, false).getCell();
+ Cell resultCell3 = store.resolve(seekKey3, false).getCell();
+ // compare
+ Assert.assertEquals(Bytes.toString(value), Bytes.toString(CellUtil.cloneValue(resultCell1)));
+ Assert.assertEquals(Bytes.toString(value), Bytes.toString(CellUtil.cloneValue(resultCell2)));
+ Assert.assertEquals(Bytes.toString(value2), Bytes.toString(CellUtil.cloneValue(resultCell3)));
}
/**