You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2023/03/20 11:26:56 UTC
[hbase] branch branch-2.5 updated: HBASE-27646 Should not use pread when prefetching in HFilePreadReader (#5063) (#5124)
This is an automated email from the ASF dual-hosted git repository.
haxiaolin pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 0bc7f56a006 HBASE-27646 Should not use pread when prefetching in HFilePreadReader (#5063) (#5124)
0bc7f56a006 is described below
commit 0bc7f56a006d13a1c6cad58550c066bdd0b2e973
Author: Xiaolin Ha <ha...@apache.org>
AuthorDate: Mon Mar 20 19:26:47 2023 +0800
HBASE-27646 Should not use pread when prefetching in HFilePreadReader (#5063) (#5124)
Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
.../hadoop/hbase/io/FSDataInputStreamWrapper.java | 9 +++
.../java/org/apache/hadoop/hbase/io/FileLink.java | 6 +-
.../hadoop/hbase/io/hfile/HFilePreadReader.java | 20 ++++++-
.../hbase/io/hfile/ReaderContextBuilder.java | 12 ++++
.../apache/hadoop/hbase/io/hfile/TestPrefetch.java | 66 +++++++++++++++++++++-
5 files changed, 109 insertions(+), 4 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index 3ad07622a4f..cb9dc84b94b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -97,6 +97,8 @@ public class FSDataInputStreamWrapper implements Closeable {
private Boolean instanceOfCanUnbuffer = null;
private CanUnbuffer unbuffer = null;
+ protected Path readerPath;
+
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
this(fs, path, false, -1L);
}
@@ -127,6 +129,9 @@ public class FSDataInputStreamWrapper implements Closeable {
// Initially we are going to read the tail block. Open the reader w/FS checksum.
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+ this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream
+ ? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath()
+ : path;
setStreamOptions(stream);
}
@@ -342,4 +347,8 @@ public class FSDataInputStreamWrapper implements Closeable {
}
}
}
+
+ public Path getReaderPath() {
+ return readerPath;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
index 86f8f935334..d38e6974813 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
@@ -83,7 +83,7 @@ public class FileLink {
* FileLink InputStream that handles the switch between the original path and the alternative
* locations, when the file is moved.
*/
- private static class FileLinkInputStream extends InputStream
+ protected static class FileLinkInputStream extends InputStream
implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer {
private FSDataInputStream in = null;
private Path currentPath = null;
@@ -286,6 +286,10 @@ public class FileLink {
public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
in.setDropBehind(dropCache);
}
+
+ public Path getCurrentPath() {
+ return currentPath;
+ }
}
private Path[] locations = null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 0eb2aa7db00..1bd3d5d674f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +42,15 @@ public class HFilePreadReader extends HFileReaderImpl {
public void run() {
long offset = 0;
long end = 0;
+ HFile.Reader prefetchStreamReader = null;
try {
+ ReaderContext streamReaderContext = ReaderContextBuilder.newBuilder(context)
+ .withReaderType(ReaderContext.ReaderType.STREAM)
+ .withInputStreamWrapper(new FSDataInputStreamWrapper(context.getFileSystem(),
+ context.getInputStreamWrapper().getReaderPath()))
+ .build();
+ prefetchStreamReader =
+ new HFileStreamReader(streamReaderContext, fileInfo, cacheConf, conf);
end = getTrailer().getLoadOnOpenDataOffset();
if (LOG.isTraceEnabled()) {
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
@@ -56,8 +65,8 @@ public class HFilePreadReader extends HFileReaderImpl {
// the internal-to-hfileblock thread local which holds the overread that gets the
// next header, will not have happened...so, pass in the onDiskSize gotten from the
// cached block. This 'optimization' triggers extremely rarely I'd say.
- HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
- /* pread= */true, false, false, null, null, true);
+ HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock,
+ /* cacheBlock= */true, /* pread= */false, false, false, null, null, true);
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
@@ -77,6 +86,13 @@ public class HFilePreadReader extends HFileReaderImpl {
// Other exceptions are interesting
LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
} finally {
+ if (prefetchStreamReader != null) {
+ try {
+ prefetchStreamReader.close(false);
+ } catch (IOException e) {
+ LOG.warn("Close prefetch stream reader failed, path: " + path, e);
+ }
+ }
PrefetchExecutor.complete(path);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java
index 0ec3de58fff..718f7fcb78a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java
@@ -43,6 +43,18 @@ public class ReaderContextBuilder {
public ReaderContextBuilder() {
}
+ public static ReaderContextBuilder newBuilder(ReaderContext readerContext) {
+ return new ReaderContextBuilder(readerContext);
+ }
+
+ private ReaderContextBuilder(ReaderContext readerContext) {
+ this.filePath = readerContext.getFilePath();
+ this.fsdis = readerContext.getInputStreamWrapper();
+ this.fileSize = readerContext.getFileSize();
+ this.hfs = readerContext.getFileSystem();
+ this.type = readerContext.getReaderType();
+ }
+
public ReaderContextBuilder withFilePath(Path filePath) {
this.filePath = filePath;
return this;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 9844ebbf42f..d2b94177671 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -46,17 +47,27 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MatcherPredicate;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -226,6 +237,60 @@ public class TestPrefetch {
}
+ @Test
+ public void testPrefetchDoesntSkipHFileLink() throws Exception {
+ testPrefetchWhenHFileLink(c -> {
+ boolean isCached = c != null;
+ assertTrue(isCached);
+ });
+ }
+
+ private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception {
+ cacheConf = new CacheConfig(conf, blockCache);
+ HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
+ Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink");
+ final RegionInfo hri =
+ RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
+ // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
+ Configuration testConf = new Configuration(this.conf);
+ CommonFSUtils.setRootDir(testConf, testDir);
+ HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
+ CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
+
+ // Make a store file and write data to it.
+ StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
+ .withFilePath(regionFs.createTempName()).withFileContext(context).build();
+ TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"),
+ Bytes.toBytes("testPrefetchWhenHFileLink"));
+
+ Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath());
+ Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf"));
+ HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
+ Path linkFilePath =
+ new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
+
+ // Try to open store file from link
+ StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
+ HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
+ assertTrue(storeFileInfo.isLink());
+
+ hsf.initReader();
+ HFile.Reader reader = hsf.getReader().getHFileReader();
+ while (!reader.prefetchComplete()) {
+ // Sleep for a bit
+ Thread.sleep(1000);
+ }
+ long offset = 0;
+ while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+ HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
+ BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
+ if (block.getBlockType() == BlockType.DATA) {
+ test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
+ }
+ offset += block.getOnDiskSizeWithHeader();
+ }
+ }
+
private Path writeStoreFile(String fname) throws IOException {
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
return writeStoreFile(fname, meta);
@@ -263,5 +328,4 @@ public class TestPrefetch {
return keyType;
}
}
-
}