You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/06 15:04:32 UTC

[hbase] 02/12: HBASE-25988 Store the store file list by a file (#3578)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26067
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit fc363a5e15686e5724ab1b22b7bf81e3ba6d73e5
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Aug 26 18:51:12 2021 +0800

    HBASE-25988 Store the store file list by a file (#3578)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../protobuf/server/region/StoreFileTracker.proto  |  29 +++--
 .../hadoop/hbase/regionserver/StoreContext.java    |   5 +
 .../hadoop/hbase/regionserver/StoreEngine.java     |   8 +-
 .../storefiletracker/DefaultStoreFileTracker.java  |   5 +-
 .../FileBasedStoreFileTracker.java                 | 142 +++++++++++++++++++++
 .../storefiletracker/StoreFileListFile.java        | 142 +++++++++++++++++++++
 .../storefiletracker/StoreFileTrackerBase.java     |  11 +-
 .../storefiletracker/StoreFileTrackerFactory.java  |  12 +-
 .../TestRegionWithFileBasedStoreFileTracker.java   | 109 ++++++++++++++++
 9 files changed, 430 insertions(+), 33 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java b/hbase-protocol-shaded/src/main/protobuf/server/region/StoreFileTracker.proto
similarity index 57%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
copy to hbase-protocol-shaded/src/main/protobuf/server/region/StoreFileTracker.proto
index 4f7231b..2a269ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
+++ b/hbase-protocol-shaded/src/main/protobuf/server/region/StoreFileTracker.proto
@@ -15,21 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver.storefiletracker;
+syntax = "proto2";
+// This file contains protocol buffers that are used for store file tracker.
+package hbase.pb;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.StoreContext;
-import org.apache.yetus.audience.InterfaceAudience;
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "StoreFileTrackerProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
 
-/**
- * Factory method for creating store file tracker.
- */
-@InterfaceAudience.Private
-public final class StoreFileTrackerFactory {
+message StoreFileEntry {
+  required string name = 1;
+  required uint64 size = 2;
+}
 
-  public static StoreFileTracker create(Configuration conf, TableName tableName,
-    boolean isPrimaryReplica, StoreContext ctx) {
-    return new DefaultStoreFileTracker(conf, tableName, isPrimaryReplica, ctx);
-  }
+message StoreFileList {
+  required uint64 timestamp = 1;
+  repeated StoreFileEntry store_file = 2;
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
index 2a9f968..588f8f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.function.Supplier;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.HeapSize;
@@ -109,6 +110,10 @@ public final class StoreContext implements HeapSize {
     return coprocessorHost;
   }
 
+  public TableName getTableName() {
+    return getRegionInfo().getTable();
+  }
+
   public RegionInfo getRegionInfo() {
     return regionFileSystem.getRegionInfo();
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index 9f0993f..e6f9395 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -172,9 +172,9 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
     return this.storeFlusher;
   }
 
-  private StoreFileTracker createStoreFileTracker(HStore store) {
-    return StoreFileTrackerFactory.create(store.conf, store.getRegionInfo().getTable(),
-      store.isPrimaryReplicaStore(), store.getStoreContext());
+  private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
+    return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
+      store.getStoreContext());
   }
 
   /**
@@ -205,7 +205,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
     this.ctx = store.getStoreContext();
     this.coprocessorHost = store.getHRegion().getCoprocessorHost();
     this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
-    this.storeFileTracker = createStoreFileTracker(store);
+    this.storeFileTracker = createStoreFileTracker(conf, store);
     assert compactor != null && compactionPolicy != null && storeFileManager != null &&
       storeFlusher != null && storeFileTracker != null;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
index d4c9a86..fa04481 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.StoreContext;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -33,9 +32,9 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 class DefaultStoreFileTracker extends StoreFileTrackerBase {
 
-  public DefaultStoreFileTracker(Configuration conf, TableName tableName, boolean isPrimaryReplica,
+  public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
     StoreContext ctx) {
-    super(conf, tableName, isPrimaryReplica, ctx);
+    super(conf, isPrimaryReplica, ctx);
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java
new file mode 100644
index 0000000..de28b0e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
+
+/**
+ * A file based store file tracker.
+ * <p/>
+ * For this tracking way, the store file list will be persistent into a file, so we can write the
+ * new store files directly to the final data directory, as we will not load the broken files. This
+ * will greatly reduce the time for flush and compaction on some object storages as a rename is
+ * actual a copy on them. And it also avoid listing when loading store file list, which could also
+ * speed up the loading of store files as listing is also not a fast operation on most object
+ * storages.
+ */
+@InterfaceAudience.Private
+public class FileBasedStoreFileTracker extends StoreFileTrackerBase {
+
+  private final StoreFileListFile backedFile;
+
+  private final Map<String, StoreFileInfo> storefiles = new HashMap<>();
+
+  public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
+    super(conf, isPrimaryReplica, ctx);
+    backedFile = new StoreFileListFile(ctx);
+  }
+
+  @Override
+  public List<StoreFileInfo> load() throws IOException {
+    StoreFileList list = backedFile.load();
+    if (list == null) {
+      return Collections.emptyList();
+    }
+    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
+    List<StoreFileInfo> infos = new ArrayList<>();
+    for (StoreFileEntry entry : list.getStoreFileList()) {
+      infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(),
+        ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(),
+        new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName())));
+    }
+    // In general, for primary replica, the load method should only be called once when
+    // initialization, so we do not need synchronized here. And for secondary replicas, though the
+    // load method could be called multiple times, we will never call other methods so no
+    // synchronized is also fine.
+    // But we have a refreshStoreFiles method in the Region interface, which can be called by CPs,
+    // and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so
+    // for safety, let's still keep the synchronized here.
+    synchronized (storefiles) {
+      for (StoreFileInfo info : infos) {
+        storefiles.put(info.getPath().getName(), info);
+      }
+    }
+    return infos;
+  }
+
+  @Override
+  protected boolean requireWritingToTmpDirFirst() {
+    return false;
+  }
+
+  private StoreFileEntry toStoreFileEntry(StoreFileInfo info) {
+    return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize())
+      .build();
+  }
+
+  @Override
+  protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
+    synchronized (storefiles) {
+      StoreFileList.Builder builder = StoreFileList.newBuilder();
+      for (StoreFileInfo info : storefiles.values()) {
+        builder.addStoreFile(toStoreFileEntry(info));
+      }
+      for (StoreFileInfo info : newFiles) {
+        builder.addStoreFile(toStoreFileEntry(info));
+      }
+      backedFile.update(builder);
+      for (StoreFileInfo info : newFiles) {
+        storefiles.put(info.getPath().getName(), info);
+      }
+    }
+  }
+
+  @Override
+  protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
+    Collection<StoreFileInfo> newFiles) throws IOException {
+    Set<String> compactedFileNames =
+      compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet());
+    synchronized (storefiles) {
+      StoreFileList.Builder builder = StoreFileList.newBuilder();
+      storefiles.forEach((name, info) -> {
+        if (compactedFileNames.contains(name)) {
+          return;
+        }
+        builder.addStoreFile(toStoreFileEntry(info));
+      });
+      for (StoreFileInfo info : newFiles) {
+        builder.addStoreFile(toStoreFileEntry(info));
+      }
+      backedFile.update(builder);
+      for (String name : compactedFileNames) {
+        storefiles.remove(name);
+      }
+      for (StoreFileInfo info : newFiles) {
+        storefiles.put(info.getPath().getName(), info);
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java
new file mode 100644
index 0000000..c778bfc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
+
+/**
+ * To fully avoid listing, here we use two files for tracking. When loading, we will try to read
+ * both the two files, if only one exists, we will trust this one, if both exist, we will compare
+ * the timestamp to see which one is newer and trust that one. And we will record in memory that
+ * which one is trusted by us, and when we need to update the store file list, we will write to the
+ * other file.
+ * <p/>
+ * So in this way, we could avoid listing when we want to load the store file list file.
+ */
+@InterfaceAudience.Private
+class StoreFileListFile {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class);
+
+  private static final String TRACK_FILE_DIR = ".filelist";
+
+  private static final String TRACK_FILE = "f1";
+
+  private static final String TRACK_FILE_ROTATE = "f2";
+
+  private final StoreContext ctx;
+
+  private final Path trackFileDir;
+
+  private final Path[] trackFiles = new Path[2];
+
+  // this is used to make sure that we do not go backwards
+  private long prevTimestamp = -1;
+
+  private int nextTrackFile = -1;
+
+  StoreFileListFile(StoreContext ctx) {
+    this.ctx = ctx;
+    trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR);
+    trackFiles[0] = new Path(trackFileDir, TRACK_FILE);
+    trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE);
+  }
+
+  private StoreFileList load(Path path) throws IOException {
+    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
+    byte[] bytes;
+    try (FSDataInputStream in = fs.open(path)) {
+      bytes = ByteStreams.toByteArray(in);
+    }
+    // Read all the bytes and then parse it, so we will only throw InvalidProtocolBufferException
+    // here. This is very important for upper layer to determine whether this is the normal case,
+    // where the file does not exist or is incomplete. If there is another type of exception, the
+    // upper layer should throw it out instead of just ignoring it, otherwise it will lead to data
+    // loss.
+    return StoreFileList.parseFrom(bytes);
+  }
+
+  private int select(StoreFileList[] lists) {
+    if (lists[0] == null) {
+      return 1;
+    }
+    if (lists[1] == null) {
+      return 0;
+    }
+    return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1;
+  }
+
+  StoreFileList load() throws IOException {
+    StoreFileList[] lists = new StoreFileList[2];
+    for (int i = 0; i < 2; i++) {
+      try {
+        lists[i] = load(trackFiles[i]);
+      } catch (FileNotFoundException | InvalidProtocolBufferException e) {
+        // this is normal case, so use info and do not log stacktrace
+        LOG.info("Failed to load track file {}: {}", trackFiles[i], e);
+      }
+    }
+    int winnerIndex = select(lists);
+    if (lists[winnerIndex] != null) {
+      nextTrackFile = 1 - winnerIndex;
+      prevTimestamp = lists[winnerIndex].getTimestamp();
+    } else {
+      nextTrackFile = 0;
+    }
+    return lists[winnerIndex];
+  }
+
+  /**
+   * We will set the timestamp in this method so just pass the builder in
+   */
+  void update(StoreFileList.Builder builder) throws IOException {
+    Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update");
+    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
+    long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
+    try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
+      builder.setTimestamp(timestamp).build().writeTo(out);
+    }
+    // record timestamp
+    prevTimestamp = timestamp;
+    // rotate the file
+    nextTrackFile = 1 - nextTrackFile;
+    try {
+      fs.delete(trackFiles[nextTrackFile], false);
+    } catch (IOException e) {
+      // we will create new file with overwrite = true, so not a big deal here, only for speed up
+      // loading as we do not need to read this file when loading(we will hit FileNotFoundException)
+      LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
index 2451f45..92c6992 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Collection;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
@@ -51,18 +50,14 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
 
   protected final Configuration conf;
 
-  protected final TableName tableName;
-
   protected final boolean isPrimaryReplica;
 
   protected final StoreContext ctx;
 
   private volatile boolean cacheOnWriteLogged;
 
-  protected StoreFileTrackerBase(Configuration conf, TableName tableName, boolean isPrimaryReplica,
-    StoreContext ctx) {
+  protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
     this.conf = conf;
-    this.tableName = tableName;
     this.isPrimaryReplica = isPrimaryReplica;
     this.ctx = ctx;
   }
@@ -95,7 +90,7 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
       .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
       .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
       .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
-      .withTableName(tableName.getName()).withCellComparator(ctx.getComparator()).build();
+      .withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()).build();
     return hFileContext;
   }
 
@@ -153,7 +148,7 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
       outputDir =
         new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
     } else {
-      throw new UnsupportedOperationException("not supported yet");
+      outputDir = ctx.getFamilyStoreDirectoryPath();
     }
     StoreFileWriter.Builder builder =
       new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
index 4f7231b..6cdfaf4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
@@ -18,8 +18,8 @@
 package org.apache.hadoop.hbase.regionserver.storefiletracker;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -28,8 +28,12 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public final class StoreFileTrackerFactory {
 
-  public static StoreFileTracker create(Configuration conf, TableName tableName,
-    boolean isPrimaryReplica, StoreContext ctx) {
-    return new DefaultStoreFileTracker(conf, tableName, isPrimaryReplica, ctx);
+  public static final String TRACK_IMPL = "hbase.store.file-tracker.impl";
+
+  public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
+    StoreContext ctx) {
+    Class<? extends StoreFileTracker> tracker =
+      conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
+    return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestRegionWithFileBasedStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestRegionWithFileBasedStoreFileTracker.java
new file mode 100644
index 0000000..3bc60d1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestRegionWithFileBasedStoreFileTracker.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestRegionWithFileBasedStoreFileTracker {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionWithFileBasedStoreFileTracker.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  private static final byte[] CF = Bytes.toBytes("cf");
+
+  private static final byte[] CQ = Bytes.toBytes("cq");
+
+  private static final TableDescriptor TD =
+    TableDescriptorBuilder.newBuilder(TableName.valueOf("file_based_tracker"))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
+
+  private static final RegionInfo RI = RegionInfoBuilder.newBuilder(TD.getTableName()).build();
+
+  @Rule
+  public TestName name = new TestName();
+
+  private HRegion region;
+
+  @Before
+  public void setUp() throws IOException {
+    Configuration conf = new Configuration(UTIL.getConfiguration());
+    conf.setClass(StoreFileTrackerFactory.TRACK_IMPL, FileBasedStoreFileTracker.class,
+      StoreFileTracker.class);
+    region =
+      HBaseTestingUtil.createRegionAndWAL(RI, UTIL.getDataTestDir(name.getMethodName()), conf, TD);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (region != null) {
+      HBaseTestingUtil.closeRegionAndWAL(region);
+    }
+    UTIL.cleanupTestDir();
+  }
+
+  @Test
+  public void testFlushAndCompaction() throws IOException {
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        int v = i * 10 + j;
+        region.put(new Put(Bytes.toBytes(v)).addColumn(CF, CQ, Bytes.toBytes(v)));
+      }
+      region.flush(true);
+      if (i % 3 == 2) {
+        region.compact(true);
+      }
+    }
+    // reopen the region, make sure the store file tracker works, i.e, we can get all the records
+    // back
+    region.close();
+    region = HRegion.openHRegion(region, null);
+    for (int i = 0; i < 100; i++) {
+      Result result = region.get(new Get(Bytes.toBytes(i)));
+      assertEquals(i, Bytes.toInt(result.getValue(CF, CQ)));
+    }
+  }
+}