You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bi...@apache.org on 2019/11/08 01:39:04 UTC
[hbase] branch branch-2.2 updated: HBASE-22480 Get block from
BlockCache once and return this block to BlockCache twice make ref count
error.
This is an automated email from the ASF dual-hosted git repository.
binlijin pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 2855f14 HBASE-22480 Get block from BlockCache once and return this block to BlockCache twice make ref count error.
2855f14 is described below
commit 2855f14b586e462d6ee3e389ae190d94c17f04a3
Author: binlijin <bi...@gmail.com>
AuthorDate: Fri Nov 8 09:38:53 2019 +0800
HBASE-22480 Get block from BlockCache once and return this block to BlockCache twice make ref count error.
---
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 104 +++++++++-----
.../hadoop/hbase/io/hfile/TestHFileReaderImpl.java | 156 +++++++++++++++++++++
2 files changed, 224 insertions(+), 36 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 69f45be..d1b3a89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -560,6 +560,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
this.curBlock = null;
}
}
+
@Override
public boolean isSeeked(){
return blockBuffer != null;
@@ -881,13 +882,15 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// The key we are interested in
if (previousBlockOffset == -1) {
// we have a 'problem', the key we want is the first of the file.
+ releaseIfNotCurBlock(seekToBlock);
return false;
}
// The first key in the current block 'seekToBlock' is greater than the given
// seekBefore key. We will go ahead by reading the next block that satisfies the
// given key. Return the current block before reading the next one.
- reader.returnBlock(seekToBlock);
+ releaseIfNotCurBlock(seekToBlock);
+
// It is important that we compute and pass onDiskSize to the block
// reader so that it does not have to read the header separately to
// figure out the size. Currently, we do not have a way to do this
@@ -905,6 +908,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
/**
+ * The curBlock will be released by shipping or close method, so only need to consider releasing
+ * the block, which was read from HFile before and not referenced by curBlock.
+ */
+ protected void releaseIfNotCurBlock(HFileBlock block) {
+ if (curBlock != block) {
+ reader.returnBlock(block);
+ }
+ }
+
+ /**
* Scans blocks in the "scanned" section of the {@link HFile} until the next
* data block is found.
*
@@ -1141,6 +1154,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
HFileBlock newBlock = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
if (newBlock.getOffset() < 0) {
+ releaseIfNotCurBlock(newBlock);
throw new IOException(
"Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath());
}
@@ -1197,19 +1211,23 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
* @param newBlock the block to make current
*/
protected void updateCurrentBlock(HFileBlock newBlock) throws IOException {
- // Set the active block on the reader
- // sanity check
- if (newBlock.getBlockType() != BlockType.DATA) {
- throw new IllegalStateException("ScannerV2 works only on data " + "blocks, got "
- + newBlock.getBlockType() + "; " + "HFileName=" + reader.getPath()
- + ", " + "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + "isCompaction="
- + isCompaction);
- }
+ try {
+ // Set the active block on the reader
+ // sanity check
+ if (newBlock.getBlockType() != BlockType.DATA) {
+ throw new IllegalStateException(
+ "ScannerV2 works only on data " + "blocks, got " + newBlock.getBlockType() + "; "
+ + "HFileName=" + reader.getPath() + ", " + "dataBlockEncoder=" + reader
+ .getDataBlockEncoding() + ", " + "isCompaction=" + isCompaction);
+ }
- updateCurrBlockRef(newBlock);
- blockBuffer = newBlock.getBufferWithoutHeader();
- readKeyValueLen();
- blockFetches.incrementAndGet();
+ updateCurrBlockRef(newBlock);
+ blockBuffer = newBlock.getBufferWithoutHeader();
+ readKeyValueLen();
+ blockFetches.incrementAndGet();
+ } finally {
+ releaseIfNotCurBlock(newBlock);
+ }
// Reset the next indexed key
this.nextIndexedKey = null;
@@ -1321,7 +1339,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
cache.returnBlock(cacheKey, compressedBlock);
}
}
- validateBlockType(cachedBlock, expectedBlockType);
+ try {
+ validateBlockType(cachedBlock, expectedBlockType);
+ } catch (IOException e) {
+ returnAndEvictBlock(cache, cacheKey, cachedBlock);
+ throw e;
+ }
if (expectedDataBlockEncoding == null) {
return cachedBlock;
@@ -1356,8 +1379,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
+ path);
// This is an error scenario. so here we need to decrement the
// count.
- cache.returnBlock(cacheKey, cachedBlock);
- cache.evictBlock(cacheKey);
+ returnAndEvictBlock(cache, cacheKey, cachedBlock);
}
return null;
}
@@ -1367,6 +1389,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return null;
}
+ private void returnAndEvictBlock(BlockCache cache, BlockCacheKey cacheKey, Cacheable block) {
+ cache.returnBlock(cacheKey, block);
+ cache.evictBlock(cacheKey);
+ }
+
/**
* @param metaBlockName
* @param cacheBlock Add block to cache, if found
@@ -1474,10 +1501,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// Validate encoding type for data blocks. We include encoding
// type in the cache key, and we expect it to match on a cache hit.
if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
- throw new IOException("Cached block under key " + cacheKey + " "
- + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
- + dataBlockEncoder.getDataBlockEncoding() + ")"
- + ", path=" + path);
+ cacheConf.getBlockCache().ifPresent(cache -> {
+ returnAndEvictBlock(cache, cacheKey, cachedBlock);
+ });
+ throw new IOException(
+ "Cached block under key " + cacheKey + " " + "has wrong encoding: "
+ + cachedBlock.getDataBlockEncoding() + " (expected: " + dataBlockEncoder
+ .getDataBlockEncoding() + ")" + ", path=" + path);
}
}
// Cache-hit. Return!
@@ -1640,23 +1670,25 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
*/
@Override
protected void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
-
- // sanity checks
- if (newBlock.getBlockType() != BlockType.ENCODED_DATA) {
- throw new IllegalStateException("EncodedScanner works only on encoded data blocks");
- }
- short dataBlockEncoderId = newBlock.getDataBlockEncodingId();
- if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
- String encoderCls = dataBlockEncoder.getClass().getName();
- throw new CorruptHFileException("Encoder " + encoderCls
- + " doesn't support data block encoding "
- + DataBlockEncoding.getNameFromId(dataBlockEncoderId)
- + ", path=" + reader.getPath());
+ try {
+ // sanity checks
+ if (newBlock.getBlockType() != BlockType.ENCODED_DATA) {
+ throw new IllegalStateException("EncodedScanner works only on encoded data blocks");
+ }
+ short dataBlockEncoderId = newBlock.getDataBlockEncodingId();
+ if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
+ String encoderCls = dataBlockEncoder.getClass().getName();
+ throw new CorruptHFileException(
+ "Encoder " + encoderCls + " doesn't support data block encoding " + DataBlockEncoding
+ .getNameFromId(dataBlockEncoderId) + ", path=" + reader.getPath());
+ }
+ updateCurrBlockRef(newBlock);
+ ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
+ seeker.setCurrentBuffer(encodedBuffer);
+ blockFetches.incrementAndGet();
+ } finally {
+ releaseIfNotCurBlock(newBlock);
}
- updateCurrBlockRef(newBlock);
- ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
- seeker.setCurrentBuffer(encodedBuffer);
- blockFetches.incrementAndGet();
// Reset the next indexed key
this.nextIndexedKey = null;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java
new file mode 100644
index 0000000..33948d0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+
+/**
+ * Test
+ */
+@Category({ IOTests.class, SmallTests.class})
+public class TestHFileReaderImpl {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestHFileReaderImpl.class);
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ static KeyValue toKV(String row) {
+ return new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
+ Bytes.toBytes("value"));
+ }
+
+ static String toRowStr(Cell c) {
+ return Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
+ }
+
+ Path makeNewFile() throws IOException {
+ Path ncTFile = new Path(TEST_UTIL.getDataTestDir(), "basic.hfile");
+ FSDataOutputStream fout = TEST_UTIL.getTestFileSystem().create(ncTFile);
+ int blocksize = toKV("a").getLength() * 3;
+ HFileContext context =
+ new HFileContextBuilder().withBlockSize(blocksize).withIncludesTags(true).build();
+ Configuration conf = TEST_UTIL.getConfiguration();
+ HFile.Writer writer =
+ HFile.getWriterFactoryNoCache(conf).withOutputStream(fout).withFileContext(context)
+ .withComparator(CellComparatorImpl.COMPARATOR).create();
+ // 4 bytes * 3 * 2 for each key/value +
+ // 3 for keys, 15 for values = 42 (woot)
+ writer.append(toKV("c"));
+ writer.append(toKV("e"));
+ writer.append(toKV("g"));
+ // block transition
+ writer.append(toKV("i"));
+ writer.append(toKV("k"));
+ writer.close();
+ fout.close();
+ return ncTFile;
+ }
+
+ @Test
+ public void testSeekBefore() throws Exception {
+ Path p = makeNewFile();
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Configuration conf = TEST_UTIL.getConfiguration();
+ int[] bucketSizes = { 512, 2048, 4096, 64 * 1024, 128 * 1024 };
+ BucketCache bucketcache =
+ new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
+
+ HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf, bucketcache), true, conf);
+ reader.loadFileInfo();
+
+ // warm cache
+ HFileScanner scanner = reader.getScanner(true, true);
+ scanner.seekTo(toKV("i"));
+ assertEquals("i", toRowStr(scanner.getCell()));
+ scanner.close();
+
+ while (bucketcache.getBlockCount() <= 0) {
+ Thread.sleep(10);
+ }
+
+ // reopen again.
+ scanner = reader.getScanner(true, true);
+ scanner.seekTo(toKV("i"));
+ assertEquals("i", toRowStr(scanner.getCell()));
+ scanner.seekBefore(toKV("i"));
+ assertEquals("g", toRowStr(scanner.getCell()));
+ scanner.close();
+
+ for (CachedBlock cachedBlock : Lists.newArrayList(bucketcache)) {
+ BlockCacheKey cacheKey =
+ new BlockCacheKey(cachedBlock.getFilename(), cachedBlock.getOffset());
+ int refCount = bucketcache.getRefCount(cacheKey);
+ assertEquals(0, refCount);
+ }
+
+ // case 2
+ scanner = reader.getScanner(true, true);
+ scanner.seekTo(toKV("i"));
+ assertEquals("i", toRowStr(scanner.getCell()));
+ scanner.seekBefore(toKV("c"));
+ scanner.close();
+ for (CachedBlock cachedBlock : Lists.newArrayList(bucketcache)) {
+ BlockCacheKey cacheKey =
+ new BlockCacheKey(cachedBlock.getFilename(), cachedBlock.getOffset());
+ int refCount = bucketcache.getRefCount(cacheKey);
+ assertEquals(0, refCount);
+ }
+
+ reader.close();
+
+ // clear bucketcache
+ for (CachedBlock cachedBlock : Lists.newArrayList(bucketcache)) {
+ BlockCacheKey cacheKey =
+ new BlockCacheKey(cachedBlock.getFilename(), cachedBlock.getOffset());
+ bucketcache.evictBlock(cacheKey);
+ }
+ bucketcache.shutdown();
+
+ deleteTestDir(fs);
+ }
+
+ protected void deleteTestDir(FileSystem fs) throws IOException {
+ Path dataTestDir = TEST_UTIL.getDataTestDir();
+ if(fs.exists(dataTestDir)) {
+ fs.delete(dataTestDir, true);
+ }
+ }
+
+}