You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2023/03/20 11:26:26 UTC

[hbase] branch branch-2 updated: HBASE-27646 Should not use pread when prefetching in HFilePreadReader (#5063) (#5122)

This is an automated email from the ASF dual-hosted git repository.

haxiaolin pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 873f8987f14 HBASE-27646 Should not use pread when prefetching in HFilePreadReader (#5063) (#5122)
873f8987f14 is described below

commit 873f8987f14c42bb875f9cfc31b3e5f925ea5760
Author: Xiaolin Ha <ha...@apache.org>
AuthorDate: Mon Mar 20 19:26:17 2023 +0800

    HBASE-27646 Should not use pread when prefetching in HFilePreadReader (#5063) (#5122)
    
    Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
 .../hadoop/hbase/io/FSDataInputStreamWrapper.java  |  9 ++++
 .../java/org/apache/hadoop/hbase/io/FileLink.java  |  6 ++-
 .../hadoop/hbase/io/hfile/HFilePreadReader.java    | 20 +++++++-
 .../hbase/io/hfile/ReaderContextBuilder.java       | 12 +++++
 .../apache/hadoop/hbase/io/hfile/TestPrefetch.java | 58 ++++++++++++++++++++++
 5 files changed, 102 insertions(+), 3 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index 3ad07622a4f..cb9dc84b94b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -97,6 +97,8 @@ public class FSDataInputStreamWrapper implements Closeable {
   private Boolean instanceOfCanUnbuffer = null;
   private CanUnbuffer unbuffer = null;
 
+  protected Path readerPath;
+
   public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
     this(fs, path, false, -1L);
   }
@@ -127,6 +129,9 @@ public class FSDataInputStreamWrapper implements Closeable {
     // Initially we are going to read the tail block. Open the reader w/FS checksum.
     this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
     this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+    this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream
+      ? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath()
+      : path;
     setStreamOptions(stream);
   }
 
@@ -342,4 +347,8 @@ public class FSDataInputStreamWrapper implements Closeable {
       }
     }
   }
+
+  public Path getReaderPath() {
+    return readerPath;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
index 86f8f935334..d38e6974813 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
@@ -83,7 +83,7 @@ public class FileLink {
    * FileLink InputStream that handles the switch between the original path and the alternative
    * locations, when the file is moved.
    */
-  private static class FileLinkInputStream extends InputStream
+  protected static class FileLinkInputStream extends InputStream
     implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer {
     private FSDataInputStream in = null;
     private Path currentPath = null;
@@ -286,6 +286,10 @@ public class FileLink {
     public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
       in.setDropBehind(dropCache);
     }
+
+    public Path getCurrentPath() {
+      return currentPath;
+    }
   }
 
   private Path[] locations = null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 440f4a7909a..2c71ce9f484 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +42,15 @@ public class HFilePreadReader extends HFileReaderImpl {
         public void run() {
           long offset = 0;
           long end = 0;
+          HFile.Reader prefetchStreamReader = null;
           try {
+            ReaderContext streamReaderContext = ReaderContextBuilder.newBuilder(context)
+              .withReaderType(ReaderContext.ReaderType.STREAM)
+              .withInputStreamWrapper(new FSDataInputStreamWrapper(context.getFileSystem(),
+                context.getInputStreamWrapper().getReaderPath()))
+              .build();
+            prefetchStreamReader =
+              new HFileStreamReader(streamReaderContext, fileInfo, cacheConf, conf);
             end = getTrailer().getLoadOnOpenDataOffset();
             if (LOG.isTraceEnabled()) {
               LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
@@ -56,8 +65,8 @@ public class HFilePreadReader extends HFileReaderImpl {
               // the internal-to-hfileblock thread local which holds the overread that gets the
               // next header, will not have happened...so, pass in the onDiskSize gotten from the
               // cached block. This 'optimization' triggers extremely rarely I'd say.
-              HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
-                /* pread= */true, false, false, null, null, true);
+              HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock,
+                /* cacheBlock= */true, /* pread= */false, false, false, null, null, true);
               try {
                 onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
                 offset += block.getOnDiskSizeWithHeader();
@@ -77,6 +86,13 @@ public class HFilePreadReader extends HFileReaderImpl {
             // Other exceptions are interesting
             LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
           } finally {
+            if (prefetchStreamReader != null) {
+              try {
+                prefetchStreamReader.close(false);
+              } catch (IOException e) {
+                LOG.warn("Close prefetch stream reader failed, path: " + path, e);
+              }
+            }
             PrefetchExecutor.complete(path);
           }
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java
index 0ec3de58fff..718f7fcb78a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java
@@ -43,6 +43,18 @@ public class ReaderContextBuilder {
   public ReaderContextBuilder() {
   }
 
+  public static ReaderContextBuilder newBuilder(ReaderContext readerContext) {
+    return new ReaderContextBuilder(readerContext);
+  }
+
+  private ReaderContextBuilder(ReaderContext readerContext) {
+    this.filePath = readerContext.getFilePath();
+    this.fsdis = readerContext.getInputStreamWrapper();
+    this.fileSize = readerContext.getFileSize();
+    this.hfs = readerContext.getFileSystem();
+    this.type = readerContext.getReaderType();
+  }
+
   public ReaderContextBuilder withFilePath(Path filePath) {
     this.filePath = filePath;
     return this;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index eb925716636..3e71c1c5004 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -56,16 +56,20 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -252,6 +256,14 @@ public class TestPrefetch {
     });
   }
 
+  @Test
+  public void testPrefetchDoesntSkipHFileLink() throws Exception {
+    testPrefetchWhenHFileLink(c -> {
+      boolean isCached = c != null;
+      assertTrue(isCached);
+    });
+  }
+
   private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
     throws Exception {
     cacheConf = new CacheConfig(conf, blockCache);
@@ -287,6 +299,52 @@ public class TestPrefetch {
     }
   }
 
+  private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception {
+    cacheConf = new CacheConfig(conf, blockCache);
+    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
+    Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink");
+    final RegionInfo hri =
+      RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
+    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
+    Configuration testConf = new Configuration(this.conf);
+    CommonFSUtils.setRootDir(testConf, testDir);
+    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
+      CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
+
+    // Make a store file and write data to it.
+    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
+      .withFilePath(regionFs.createTempName()).withFileContext(context).build();
+    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"),
+      Bytes.toBytes("testPrefetchWhenHFileLink"));
+
+    Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath());
+    Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf"));
+    HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
+    Path linkFilePath =
+      new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
+
+    // Try to open store file from link
+    StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
+    HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
+    assertTrue(storeFileInfo.isLink());
+
+    hsf.initReader();
+    HFile.Reader reader = hsf.getReader().getHFileReader();
+    while (!reader.prefetchComplete()) {
+      // Sleep for a bit
+      Thread.sleep(1000);
+    }
+    long offset = 0;
+    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
+      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
+      if (block.getBlockType() == BlockType.DATA) {
+        test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
+      }
+      offset += block.getOnDiskSizeWithHeader();
+    }
+  }
+
   private Path writeStoreFile(String fname) throws IOException {
     HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
     return writeStoreFile(fname, meta);