You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2021/02/10 17:21:51 UTC

[GitHub] [hbase] joshelser commented on a change in pull request #2931: HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager

joshelser commented on a change in pull request #2931:
URL: https://github.com/apache/hbase/pull/2931#discussion_r573100029



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileTrackingUtils.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to support persistent storefile tracking
+ */
+@InterfaceAudience.Private
+public final class StorefileTrackingUtils {
+
+  private static Logger LOG = LoggerFactory.getLogger(StorefileTrackingUtils.class);
+  public static final long SLEEP_DELTA_MS = TimeUnit.MILLISECONDS.toMillis(100);
+
+  private StorefileTrackingUtils() {
+    // private for utility class
+  }
+
+  public static boolean isStorefileTrackingPersistEnabled(Configuration conf) {
+    boolean isStoreTrackingPersistEnabled =
+      conf.getBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED,
+        HConstants.DEFAULT_STOREFILE_TRACKING_PERSIST_ENABLED);
+    boolean isPersistedStoreEngineSet =
+      conf.get(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName())
+        .equals(PersistedStoreEngine.class.getName());
+    boolean isFeatureEnabled = isStoreTrackingPersistEnabled && isPersistedStoreEngineSet;
+    if (isStoreTrackingPersistEnabled ^ isPersistedStoreEngineSet) {
+      // check if both configuration are correct.
+      String errorMessage = String.format("please set %s to true and set store engine key %s to %s "
+          + "to enable persist storefile tracking",
+        HConstants.STOREFILE_TRACKING_PERSIST_ENABLED,
+        StoreEngine.STORE_ENGINE_CLASS_KEY, PersistedStoreEngine.class.getName());
+      throw new IllegalArgumentException(errorMessage);
+    }
+    return isFeatureEnabled;
+  }
+
+  /**
+   * if storefile tracking feature is configured, Initialize hbase:storefile table and wait for it
+   * to be online. Otherwise, look for hbase:storefile table and remove it
+   *
+   * @param masterServices masterServices
+   * @throws IOException if hbase:storefile table cannot be initialized and be online
+   */
+  public static void init(MasterServices masterServices) throws IOException {
+    createStorefileTable(masterServices);
+    waitForStoreFileTableOnline(masterServices);
+  }
+
+  /**
+   * Cleans up all storefile related state on the cluster. disable and delete hbase:storefile
+   * if found
+   * @param masterServices {@link MasterServices}
+   * @throws IOException if failures
+   */
+  private static void cleanup(MasterServices masterServices) throws IOException {
+    try {
+      masterServices.getConnection().getAdmin().disableTable(TableName.STOREFILE_TABLE_NAME);
+      masterServices.getConnection().getAdmin().deleteTable(TableName.STOREFILE_TABLE_NAME);
+    } catch (IOException ex) {
+      final String message = "Failed disable and deleting table " +
+        TableName.STOREFILE_TABLE_NAME.getNameAsString();
+      LOG.error(message);
+      throw new IOException(message, ex);
+    }
+  }
+
+  public static StoreFilePathAccessor createStoreFilePathAccessor(Configuration conf,
+    Connection connection) {
+    return new HTableStoreFilePathAccessor(conf, connection);
+  }
+
+  public static List<Path> convertStoreFilesToPaths(Collection<HStoreFile> storeFiles) {
+    return storeFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());
+  }
+
+  private static void createStorefileTable(MasterServices masterServices)
+    throws IOException {
+    if (MetaTableAccessor.getTableState(masterServices.getConnection(),
+      TableName.STOREFILE_TABLE_NAME) == null) {
+      LOG.info("{} table not found. Creating...", TableName.STOREFILE_TABLE_NAME);
+      masterServices.createSystemTable(HTableStoreFilePathAccessor.STOREFILE_TABLE_DESC);
+    }
+  }
+
+  private static void waitForStoreFileTableOnline(MasterServices masterServices)
+    throws IOException {
+    try {
+      long startTime = EnvironmentEdgeManager.currentTime();
+      long timeout = masterServices.getConfiguration()
+        .getLong(HConstants.STOREFILE_TRACKING_INIT_TIMEOUT,
+          HConstants.DEFAULT_STOREFILE_TRACKING_INIT_TIMEOUT);
+      while (!isStoreFileTableAssignedAndEnabled(masterServices)) {

Review comment:
       I'm having nightmares about the hbase:namespace table not getting assigned and getting into cycles where the HBase master would just crashloop when the table wasn't getting assigned.
   
   Not something that needs to be handled right now, but we should make sure we think about this later (rather than reintroduce old problems)

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessor.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Helper class to interact with the hbase storefile tracking data persisted as off-memory data
+ * from the {@link StoreFileManager}
+ *
+ * There is only a set of tracking storefiles, 'included'.
+ *
+ * e.g. list of storefile paths in 'included' should be the identical copy of the in-memory
+ * {@link HStoreFile}'s Path(s) and can be reused during region opens and region reassignment.
+ */
+@InterfaceAudience.Private
+public interface StoreFilePathAccessor {
+
+  /**
+   * Create the storefile tracking with the help of using the masterService
+   * @param masterServices instance of HMaster
+   * @throws IOException if Master is not running or connection has been lost
+   */
+  void initialize(final MasterServices masterServices) throws IOException;
+
+  /**
+   * GET storefile paths from the 'included' data set

Review comment:
       nit: Get

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessor.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Helper class to interact with the hbase storefile tracking data persisted as off-memory data
+ * from the {@link StoreFileManager}
+ *
+ * There is only a set of tracking storefiles, 'included'.
+ *
+ * e.g. list of storefile paths in 'included' should be the identical copy of the in-memory
+ * {@link HStoreFile}'s Path(s) and can be reused during region opens and region reassignment.
+ */
+@InterfaceAudience.Private
+public interface StoreFilePathAccessor {
+
+  /**
+   * Create the storefile tracking with the help of using the masterService
+   * @param masterServices instance of HMaster
+   * @throws IOException if Master is not running or connection has been lost
+   */
+  void initialize(final MasterServices masterServices) throws IOException;
+
+  /**
+   * GET storefile paths from the 'included' data set
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @return list of StoreFile paths that should be included in reads in this store,
+   *         returns an empty list if the target cell is empty or doesn't exist.
+   * @throws IOException if a remote or network exception occurs during Get
+   */
+  List<Path> getIncludedStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException;
+
+  /**
+   * Writes the specified updates to the tracking
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @param storeFilePathUpdate Updates to be persisted
+   * @throws IOException if a remote or network exception occurs during write
+   */
+  void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final StoreFilePathUpdate storeFilePathUpdate) throws IOException;
+
+  /**
+   * Delete storefile paths for a tracking column family, normally used when a region-store is
+   * completely removed due to region split or merge
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @throws IOException if a remote or network exception occurs during delete
+   */
+  void deleteStoreFilePaths(final String tableName, final String regionName, final String storeName)

Review comment:
       How does this method relate to `deleteRegion`? Would this be used for a `truncate` like operation?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
##########
@@ -203,5 +213,21 @@ public double getCompactionPressure() {
   public Comparator<HStoreFile> getStoreFileComparator() {
     return storeFileComparator;
   }
+
+  void setStorefiles(ImmutableList<HStoreFile> storefiles) {

Review comment:
       nit `setStoreFiles`

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HTableStoreFilePathAccessor.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Helper class to interact with the hbase:storefile system table
+ *
+ * <pre>
+ *   ROW-KEY              FAMILY:QUALIFIER      DATA VALUE
+ *   region-store-table   included:files        List&lt;Path&gt; filesIncludedInRead
+ * </pre>
+ *
+ * The region encoded name is set as prefix for region split loading balance, and we use the
+ * target table name as suffix such that operator can identify the records per table.
+ *
+ * included:files is used for persisting storefiles of StoreFileManager in the cases of store
+ * opens and store closes. Meanwhile compactedFiles of StoreFileManager isn't being tracked
+ * off-memory, because the updated included:files contains compactedFiles and the leftover
+ * compactedFiles are either archived when a store closes or opens.
+ *
+ * TODO we will need a followup change to introduce in-memory temporarily file, such that further
+ *      we can introduce a non-tracking temporarily storefiles left from a flush or compaction when
+ *      a regionserver crashes without closing the store properly
+ */
+
+@InterfaceAudience.Private
+public class HTableStoreFilePathAccessor extends AbstractStoreFilePathAccessor {
+
+  public static final byte[] STOREFILE_FAMILY_INCLUDED = Bytes.toBytes(STOREFILE_INCLUDED_STR);
+
+  private static final String DASH_SEPARATOR = "-";
+  private static final String STOREFILE_QUALIFIER_STR = "filepaths";
+  private static final byte[] STOREFILE_QUALIFIER = Bytes.toBytes(STOREFILE_QUALIFIER_STR);
+  private static final int STOREFILE_TABLE_VERSIONS = 3;
+
+  // TODO find a way for system table to support region split at table creation or remove this
+  //  comment when we merge into hbase:meta table
+  public static final TableDescriptor STOREFILE_TABLE_DESC =
+    TableDescriptorBuilder.newBuilder(TableName.STOREFILE_TABLE_NAME)
+      .setColumnFamily(
+        ColumnFamilyDescriptorBuilder.newBuilder(STOREFILE_FAMILY_INCLUDED)
+          .setMaxVersions(STOREFILE_TABLE_VERSIONS)
+          .setInMemory(true)
+          .build())
+      .setRegionSplitPolicyClassName(BusyRegionSplitPolicy.class.getName())
+      .build();
+
+  private Connection connection;
+
+  public HTableStoreFilePathAccessor(Configuration conf, Connection connection) {
+    super(conf);
+    Preconditions.checkNotNull(connection, "connection cannot be null");
+    this.connection = connection;
+  }
+
+  @Override
+  public void initialize(final MasterServices masterServices) throws IOException {
+    StorefileTrackingUtils.init(masterServices);
+  }
+
+  @Override
+  List<Path> getStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final String colFamily) throws IOException {
+    validate(tableName, regionName, storeName, colFamily);
+    byte[] colFamilyBytes = Bytes.toBytes(colFamily);
+    Get get =
+      new Get(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    get.addColumn(colFamilyBytes, STOREFILE_QUALIFIER);
+    Result result = doGet(get);
+    if (result == null || result.isEmpty()) {
+      return new ArrayList<>();
+    }
+    return byteToStoreFileList(result.getValue(colFamilyBytes, STOREFILE_QUALIFIER));
+  }
+
+  @Override
+  public void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, StoreFilePathUpdate storeFilePathUpdate)
+    throws IOException {
+    validate(tableName, regionName, storeName, storeFilePathUpdate);
+    Put put = generatePutForStoreFilePaths(tableName, regionName, storeName, storeFilePathUpdate);
+    doPut(put);
+  }
+
+
+  private Put generatePutForStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final StoreFilePathUpdate storeFilePathUpdate) {
+    Put put = new Put(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    if (storeFilePathUpdate.hasStoreFilesUpdate()) {
+      put.addColumn(Bytes.toBytes(STOREFILE_INCLUDED_STR), STOREFILE_QUALIFIER,
+        storeFileListToByteArray(storeFilePathUpdate.getStoreFiles()));
+    }
+    return put;
+  }
+
+  @Override
+  public void deleteStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException {
+    validate(tableName, regionName, storeName);
+    Delete delete = new Delete(
+      Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    delete.addColumns(STOREFILE_FAMILY_INCLUDED, STOREFILE_QUALIFIER);
+    doDelete(Lists.newArrayList(delete));
+  }
+
+  @Override
+  public void deleteRegion(String regionName) throws IOException {
+    Scan scan = getScanWithFilter(regionName);
+    List<Delete> familiesToDelete = new ArrayList<>();
+    for (Result result : getResultScanner(scan)) {
+      String rowKey = Bytes.toString(result.getRow());
+      Delete delete = new Delete(Bytes.toBytes(rowKey));
+      familiesToDelete.add(delete);
+    }
+    doDelete(familiesToDelete);
+  }
+
+  @Override
+  public Set<String> getTrackedFamilies(String tableName, String regionName)
+    throws IOException {
+    // find all rows by regionName
+    Scan scan = getScanWithFilter(regionName);
+
+    Set<String> families = new HashSet<>();
+    for (Result result : getResultScanner(scan)) {
+      String rowKey = Bytes.toString(result.getRow());
+      String family =
+        StorefileTrackingUtils.getFamilyFromKey(rowKey, tableName, regionName, getSeparator());
+      families.add(family);
+    }
+    return families;
+  }
+
+  @Override
+  String getSeparator() {
+    return DASH_SEPARATOR;
+  }
+
+  private Result doGet(final Get get) throws IOException {
+    try (Table table = getConnection().getTable(TableName.STOREFILE_TABLE_NAME)) {
+      return table.get(get);
+    }
+  }
+
+  private void doPut(final Put put) throws IOException {
+    try (Table table = getConnection().getTable(TableName.STOREFILE_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
+  private void doDelete(final List<Delete> delete) throws IOException {
+    try (Table table = getConnection().getTable(TableName.STOREFILE_TABLE_NAME)) {
+      table.delete(delete);
+    }
+  }
+
+  private ResultScanner getResultScanner(final Scan scan) throws IOException {
+    try (Table table = getConnection().getTable(TableName.STOREFILE_TABLE_NAME)) {
+      return table.getScanner(scan);
+    }
+  }
+
+  private Connection getConnection() throws IOException {
+    if (connection == null) {
+      throw new IOException("Connection should be provided by region server "
+        + "and should not be null after initialized.");
+    }
+    return connection;
+  }
+
+
+  private Scan getScanWithFilter(String regionName) {
+    Scan scan = new Scan();
+    String regexPattern = "^" + regionName + getSeparator();
+    RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL,
+      new RegexStringComparator(regexPattern));

Review comment:
       Prefix filter would do the same thing without a regex cost, no?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.List;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;

Review comment:
       Do we have commons-lang in hbase-thirdparty?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractStoreFilePathAccessor.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+@InterfaceAudience.Private
+public abstract class AbstractStoreFilePathAccessor implements StoreFilePathAccessor {
+
+  public static final String STOREFILE_INCLUDED_STR = "included";
+
+  protected static final String LIST_SEPARATOR = ";";
+  protected final Configuration conf;
+
+  public AbstractStoreFilePathAccessor(Configuration conf) {
+    this.conf = conf;
+  }
+
+  abstract String getSeparator();
+
+  abstract List<Path> getStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final String columnName) throws IOException;
+
+  @Override
+  public abstract void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, StoreFilePathUpdate storeFilePathUpdate)
+    throws IOException;
+
+  @Override
+  public List<Path> getIncludedStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException {
+    return getStoreFilePaths(tableName, regionName, storeName, STOREFILE_INCLUDED_STR);
+  }
+
+  protected static byte[] storeFileListToByteArray(List<Path> storeFilePaths) {
+    return Bytes.toBytes(Joiner.on(LIST_SEPARATOR).join(storeFilePaths));
+  }
+
+  protected static List<Path> byteToStoreFileList(byte[] data) {
+    List<Path> paths = new ArrayList<>();
+    if (data != null && data.length != 0) {
+      String pathString = Bytes.toString(data);
+      String[] pathStrings = pathString.split(LIST_SEPARATOR);

Review comment:
       This is doing a regex operation to split this. Should make sure we're doing something which avoids that.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileTrackingUtils.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to support persistent storefile tracking
+ */
+@InterfaceAudience.Private
+public final class StorefileTrackingUtils {
+
+  private static Logger LOG = LoggerFactory.getLogger(StorefileTrackingUtils.class);
+  public static final long SLEEP_DELTA_MS = TimeUnit.MILLISECONDS.toMillis(100);

Review comment:
       private instead of public?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HTableStoreFilePathAccessor.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Helper class to interact with the hbase:storefile system table
+ *
+ * <pre>
+ *   ROW-KEY              FAMILY:QUALIFIER      DATA VALUE
+ *   region-store-table   included:files        List&lt;Path&gt; filesIncludedInRead
+ * </pre>
+ *
+ * The region encoded name is set as prefix for region split loading balance, and we use the
+ * target table name as suffix such that operator can identify the records per table.
+ *
+ * included:files is used for persisting storefiles of StoreFileManager in the cases of store
+ * opens and store closes. Meanwhile compactedFiles of StoreFileManager isn't being tracked
+ * off-memory, because the updated included:files contains compactedFiles and the leftover
+ * compactedFiles are either archived when a store closes or opens.
+ *
+ * TODO we will need a followup change to introduce in-memory temporarily file, such that further
+ *      we can introduce a non-tracking temporarily storefiles left from a flush or compaction when
+ *      a regionserver crashes without closing the store properly
+ */
+
+@InterfaceAudience.Private
+public class HTableStoreFilePathAccessor extends AbstractStoreFilePathAccessor {
+
+  public static final byte[] STOREFILE_FAMILY_INCLUDED = Bytes.toBytes(STOREFILE_INCLUDED_STR);
+
+  private static final String DASH_SEPARATOR = "-";
+  private static final String STOREFILE_QUALIFIER_STR = "filepaths";
+  private static final byte[] STOREFILE_QUALIFIER = Bytes.toBytes(STOREFILE_QUALIFIER_STR);
+  private static final int STOREFILE_TABLE_VERSIONS = 3;
+
+  // TODO find a way for system table to support region split at table creation or remove this
+  //  comment when we merge into hbase:meta table
+  public static final TableDescriptor STOREFILE_TABLE_DESC =
+    TableDescriptorBuilder.newBuilder(TableName.STOREFILE_TABLE_NAME)
+      .setColumnFamily(
+        ColumnFamilyDescriptorBuilder.newBuilder(STOREFILE_FAMILY_INCLUDED)
+          .setMaxVersions(STOREFILE_TABLE_VERSIONS)
+          .setInMemory(true)
+          .build())
+      .setRegionSplitPolicyClassName(BusyRegionSplitPolicy.class.getName())
+      .build();
+
+  private Connection connection;
+
+  public HTableStoreFilePathAccessor(Configuration conf, Connection connection) {
+    super(conf);
+    Preconditions.checkNotNull(connection, "connection cannot be null");
+    this.connection = connection;
+  }
+
+  @Override
+  public void initialize(final MasterServices masterServices) throws IOException {
+    StorefileTrackingUtils.init(masterServices);
+  }
+
+  @Override
+  List<Path> getStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final String colFamily) throws IOException {
+    validate(tableName, regionName, storeName, colFamily);
+    byte[] colFamilyBytes = Bytes.toBytes(colFamily);
+    Get get =
+      new Get(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    get.addColumn(colFamilyBytes, STOREFILE_QUALIFIER);
+    Result result = doGet(get);
+    if (result == null || result.isEmpty()) {

Review comment:
       meta-question: do we actually return `null` via a `Get` in some cases? That seems silly.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HTableStoreFilePathAccessor.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Helper class to interact with the hbase:storefile system table
+ *
+ * <pre>
+ *   ROW-KEY              FAMILY:QUALIFIER      DATA VALUE
+ *   region-store-table   included:files        List&lt;Path&gt; filesIncludedInRead
+ * </pre>
+ *
+ * The region encoded name is set as prefix for region split loading balance, and we use the
+ * target table name as suffix such that operator can identify the records per table.
+ *
+ * included:files is used for persisting storefiles of StoreFileManager in the cases of store
+ * opens and store closes. Meanwhile compactedFiles of StoreFileManager isn't being tracked
+ * off-memory, because the updated included:files contains compactedFiles and the leftover
+ * compactedFiles are either archived when a store closes or opens.
+ *
+ * TODO we will need a followup change to introduce in-memory temporarily file, such that further
+ *      we can introduce a non-tracking temporarily storefiles left from a flush or compaction when
+ *      a regionserver crashes without closing the store properly
+ */
+
+@InterfaceAudience.Private
+public class HTableStoreFilePathAccessor extends AbstractStoreFilePathAccessor {
+
+  public static final byte[] STOREFILE_FAMILY_INCLUDED = Bytes.toBytes(STOREFILE_INCLUDED_STR);
+
+  private static final String DASH_SEPARATOR = "-";
+  private static final String STOREFILE_QUALIFIER_STR = "filepaths";
+  private static final byte[] STOREFILE_QUALIFIER = Bytes.toBytes(STOREFILE_QUALIFIER_STR);
+  private static final int STOREFILE_TABLE_VERSIONS = 3;
+
+  // TODO find a way for system table to support region split at table creation or remove this
+  //  comment when we merge into hbase:meta table
+  public static final TableDescriptor STOREFILE_TABLE_DESC =
+    TableDescriptorBuilder.newBuilder(TableName.STOREFILE_TABLE_NAME)
+      .setColumnFamily(
+        ColumnFamilyDescriptorBuilder.newBuilder(STOREFILE_FAMILY_INCLUDED)
+          .setMaxVersions(STOREFILE_TABLE_VERSIONS)
+          .setInMemory(true)
+          .build())
+      .setRegionSplitPolicyClassName(BusyRegionSplitPolicy.class.getName())
+      .build();
+
+  private Connection connection;
+
+  public HTableStoreFilePathAccessor(Configuration conf, Connection connection) {
+    super(conf);
+    Preconditions.checkNotNull(connection, "connection cannot be null");
+    this.connection = connection;
+  }
+
+  @Override
+  public void initialize(final MasterServices masterServices) throws IOException {
+    StorefileTrackingUtils.init(masterServices);
+  }
+
+  @Override
+  List<Path> getStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final String colFamily) throws IOException {
+    validate(tableName, regionName, storeName, colFamily);
+    byte[] colFamilyBytes = Bytes.toBytes(colFamily);
+    Get get =
+      new Get(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    get.addColumn(colFamilyBytes, STOREFILE_QUALIFIER);
+    Result result = doGet(get);
+    if (result == null || result.isEmpty()) {
+      return new ArrayList<>();
+    }
+    return byteToStoreFileList(result.getValue(colFamilyBytes, STOREFILE_QUALIFIER));
+  }
+
+  @Override
+  public void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, StoreFilePathUpdate storeFilePathUpdate)
+    throws IOException {
+    validate(tableName, regionName, storeName, storeFilePathUpdate);
+    Put put = generatePutForStoreFilePaths(tableName, regionName, storeName, storeFilePathUpdate);
+    doPut(put);
+  }
+
+
+  private Put generatePutForStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final StoreFilePathUpdate storeFilePathUpdate) {
+    Put put = new Put(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    if (storeFilePathUpdate.hasStoreFilesUpdate()) {
+      put.addColumn(Bytes.toBytes(STOREFILE_INCLUDED_STR), STOREFILE_QUALIFIER,
+        storeFileListToByteArray(storeFilePathUpdate.getStoreFiles()));
+    }
+    return put;
+  }
+
+  @Override
+  public void deleteStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException {
+    validate(tableName, regionName, storeName);
+    Delete delete = new Delete(
+      Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    delete.addColumns(STOREFILE_FAMILY_INCLUDED, STOREFILE_QUALIFIER);
+    doDelete(Lists.newArrayList(delete));
+  }
+
+  @Override
+  public void deleteRegion(String regionName) throws IOException {
+    Scan scan = getScanWithFilter(regionName);
+    List<Delete> familiesToDelete = new ArrayList<>();
+    for (Result result : getResultScanner(scan)) {
+      String rowKey = Bytes.toString(result.getRow());
+      Delete delete = new Delete(Bytes.toBytes(rowKey));
+      familiesToDelete.add(delete);
+    }
+    doDelete(familiesToDelete);
+  }
+
+  @Override
+  public Set<String> getTrackedFamilies(String tableName, String regionName)
+    throws IOException {
+    // find all rows by regionName
+    Scan scan = getScanWithFilter(regionName);
+
+    Set<String> families = new HashSet<>();
+    for (Result result : getResultScanner(scan)) {
+      String rowKey = Bytes.toString(result.getRow());
+      String family =
+        StorefileTrackingUtils.getFamilyFromKey(rowKey, tableName, regionName, getSeparator());
+      families.add(family);
+    }
+    return families;
+  }
+
+  @Override
+  String getSeparator() {
+    return DASH_SEPARATOR;
+  }
+
+  private Result doGet(final Get get) throws IOException {
+    try (Table table = getConnection().getTable(TableName.STOREFILE_TABLE_NAME)) {
+      return table.get(get);
+    }
+  }
+
+  private void doPut(final Put put) throws IOException {
+    try (Table table = getConnection().getTable(TableName.STOREFILE_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
+  private void doDelete(final List<Delete> delete) throws IOException {
+    try (Table table = getConnection().getTable(TableName.STOREFILE_TABLE_NAME)) {
+      table.delete(delete);
+    }
+  }
+
+  private ResultScanner getResultScanner(final Scan scan) throws IOException {
+    try (Table table = getConnection().getTable(TableName.STOREFILE_TABLE_NAME)) {
+      return table.getScanner(scan);
+    }
+  }
+
+  private Connection getConnection() throws IOException {
+    if (connection == null) {
+      throw new IOException("Connection should be provided by region server "
+        + "and should not be null after initialized.");
+    }

Review comment:
       Make `connection` final and then you can drop this.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFileManager.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSortedSet;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * A Storefile manager that is used by {@link PersistedStoreEngine} that persists the in-memory
+ * storefile tracking to a persistent table hbase:storefile.
+ *
+ * We don't override the {@link #clearFiles()} from {@link DefaultStoreFileManager} and persist
+ * in-memory storefiles tracking, it will be reused when region reassigns on a different
+ * region server.
+ */
+@InterfaceAudience.Private
+public class PersistedStoreFileManager extends DefaultStoreFileManager {
+  private static final Logger LOG = LoggerFactory.getLogger(PersistedStoreFileManager.class);
+  private final RegionInfo regionInfo;
+  private final String tableName;
+  private final String regionName;
+  private final String storeName;
+  private StoreFilePathAccessor accessor;
+  private Configuration conf;
+  // only uses for warmupHRegion
+  private boolean readOnly;
+
+  public PersistedStoreFileManager(CellComparator cellComparator,
+    Comparator<HStoreFile> storeFileComparator, Configuration conf,
+    CompactionConfiguration compactionConfiguration, HRegionFileSystem regionFs,
+    RegionInfo regionInfo, String familyName, StoreFilePathAccessor accessor, boolean readOnly) {
+    super(cellComparator, storeFileComparator, conf, compactionConfiguration, regionFs, familyName);
+    this.conf = conf;
+    this.regionInfo = regionInfo;
+    this.tableName = regionInfo.getTable().getNameAsString();
+    this.regionName = regionInfo.getEncodedName();
+    this.storeName = familyName;
+    this.accessor = accessor;
+    this.readOnly = readOnly;
+  }
+
+  public PersistedStoreFileManager(CellComparator cellComparator,
+    Comparator<HStoreFile> storeFileComparator, Configuration conf,
+    CompactionConfiguration compactionConfiguration, HRegionFileSystem regionFs,
+    RegionInfo regionInfo, String familyName, StoreFilePathAccessor accessor) {
+    this(cellComparator, storeFileComparator, conf, compactionConfiguration, regionFs, regionInfo,
+      familyName, accessor, false);
+  }
+
+  /**
+   * Loads the specified storeFiles to the StoreFileManager to be included in reads.
+   *
+   * @param storeFiles list of storefiles to be loaded, could be an empty list. throws exception
+   *                   if it's null.
+   */
+  @Override
+  public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
+    Preconditions.checkArgument(storeFiles != null, "store files cannot be "
+      + "null when loading");
+    if (storeFiles.isEmpty()) {
+      LOG.warn("Other than fresh region with no store files, store files should not be empty");
+      return;
+    }
+    ImmutableList<HStoreFile> sortedStorefiles =
+      ImmutableList.sortedCopyOf(getStoreFileComparator(), storeFiles);
+    setStorefiles(sortedStorefiles);
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(sortedStorefiles).build());
+  }
+
+  @Override
+  public Collection<StoreFileInfo> loadInitialFiles() throws IOException {
+    List<Path> pathList = accessor.getIncludedStoreFilePaths(tableName, regionName, storeName);
+    boolean isEmptyInPersistedFilePaths = CollectionUtils.isEmpty(pathList);
+    if (isEmptyInPersistedFilePaths) {
+      // When the path accessor is returning empty result, we scan the
+      // the file storage and see if there is any existing HFiles should be loaded.
+      // the scan is a one time process when store open during region assignment.
+      //
+      // this is especially used for region and store open
+      // 1. First time migration from a filesystem based e.g. DefaultStoreFileEngine
+      // 2. After region split and merge
+      // 3. After table clone and create new HFiles directly into data directory
+      //
+      // Also we don't handle the inconsistency between storefile tracking and file system, which
+      // will be handled by a HBCK command
+      LOG.info("Cannot find tracking paths ({}) for store {} in region {} of "
+          + "table {}, fall back to scan the storage to get a list of storefiles to be opened"
+        , isEmptyInPersistedFilePaths, storeName, regionName,
+        tableName);
+      return getRegionFs().getStoreFiles(getFamilyName());
+    }
+    ArrayList<StoreFileInfo> storeFiles = new ArrayList<>();
+    for (Path storeFilePath : pathList) {
+      if (!StoreFileInfo.isValid(getRegionFs().getFileSystem().getFileStatus(storeFilePath))) {
+        // TODO add a comment how this file will be removed when we have cleaner
+        LOG.warn("Invalid StoreFile: {}", storeFilePath);
+        continue;
+      }
+      StoreFileInfo info = ServerRegionReplicaUtil
+        .getStoreFileInfo(conf, getRegionFs().getFileSystem(), regionInfo,
+          ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo), getFamilyName(),
+          storeFilePath);
+      storeFiles.add(info);
+    }
+    return storeFiles;
+  }
+
+
+  @Override
+  public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
+    // return in case of empty storeFiles as it is a No-op, empty files expected during region close
+    if (CollectionUtils.isEmpty(sfs)) {
+      return;
+    }
+    ImmutableList<HStoreFile> storefiles = ImmutableList
+      .sortedCopyOf(getStoreFileComparator(), Iterables.concat(getStorefiles(), sfs));
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(storefiles).build());
+    setStorefiles(storefiles);
+  }
+
+  @Override
+  public void addCompactionResults(Collection<HStoreFile> newCompactedfiles,

Review comment:
       nit `newCompactedFiles`

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HTableStoreFilePathAccessor.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Helper class to interact with the hbase:storefile system table
+ *
+ * <pre>
+ *   ROW-KEY              FAMILY:QUALIFIER      DATA VALUE
+ *   region-store-table   included:files        List&lt;Path&gt; filesIncludedInRead
+ * </pre>
+ *
+ * The region encoded name is set as prefix for region split loading balance, and we use the
+ * target table name as suffix such that operator can identify the records per table.
+ *
+ * included:files is used for persisting storefiles of StoreFileManager in the cases of store
+ * opens and store closes. Meanwhile compactedFiles of StoreFileManager isn't being tracked
+ * off-memory, because the updated included:files contains compactedFiles and the leftover
+ * compactedFiles are either archived when a store closes or opens.
+ *
+ * TODO we will need a followup change to introduce in-memory temporarily file, such that further
+ *      we can introduce a non-tracking temporarily storefiles left from a flush or compaction when
+ *      a regionserver crashes without closing the store properly
+ */
+
+@InterfaceAudience.Private
+public class HTableStoreFilePathAccessor extends AbstractStoreFilePathAccessor {
+
+  public static final byte[] STOREFILE_FAMILY_INCLUDED = Bytes.toBytes(STOREFILE_INCLUDED_STR);
+
+  private static final String DASH_SEPARATOR = "-";
+  private static final String STOREFILE_QUALIFIER_STR = "filepaths";
+  private static final byte[] STOREFILE_QUALIFIER = Bytes.toBytes(STOREFILE_QUALIFIER_STR);
+  private static final int STOREFILE_TABLE_VERSIONS = 3;
+
+  // TODO find a way for system table to support region split at table creation or remove this
+  //  comment when we merge into hbase:meta table
+  public static final TableDescriptor STOREFILE_TABLE_DESC =
+    TableDescriptorBuilder.newBuilder(TableName.STOREFILE_TABLE_NAME)
+      .setColumnFamily(
+        ColumnFamilyDescriptorBuilder.newBuilder(STOREFILE_FAMILY_INCLUDED)
+          .setMaxVersions(STOREFILE_TABLE_VERSIONS)
+          .setInMemory(true)
+          .build())
+      .setRegionSplitPolicyClassName(BusyRegionSplitPolicy.class.getName())
+      .build();
+
+  private Connection connection;
+
+  public HTableStoreFilePathAccessor(Configuration conf, Connection connection) {
+    super(conf);
+    Preconditions.checkNotNull(connection, "connection cannot be null");
+    this.connection = connection;
+  }
+
+  @Override
+  public void initialize(final MasterServices masterServices) throws IOException {
+    StorefileTrackingUtils.init(masterServices);
+  }
+
+  @Override
+  List<Path> getStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final String colFamily) throws IOException {
+    validate(tableName, regionName, storeName, colFamily);
+    byte[] colFamilyBytes = Bytes.toBytes(colFamily);
+    Get get =
+      new Get(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    get.addColumn(colFamilyBytes, STOREFILE_QUALIFIER);
+    Result result = doGet(get);
+    if (result == null || result.isEmpty()) {
+      return new ArrayList<>();
+    }
+    return byteToStoreFileList(result.getValue(colFamilyBytes, STOREFILE_QUALIFIER));
+  }
+
+  @Override
+  public void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, StoreFilePathUpdate storeFilePathUpdate)
+    throws IOException {
+    validate(tableName, regionName, storeName, storeFilePathUpdate);
+    Put put = generatePutForStoreFilePaths(tableName, regionName, storeName, storeFilePathUpdate);
+    doPut(put);
+  }
+
+
+  private Put generatePutForStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final StoreFilePathUpdate storeFilePathUpdate) {
+    Put put = new Put(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    if (storeFilePathUpdate.hasStoreFilesUpdate()) {
+      put.addColumn(Bytes.toBytes(STOREFILE_INCLUDED_STR), STOREFILE_QUALIFIER,
+        storeFileListToByteArray(storeFilePathUpdate.getStoreFiles()));
+    }
+    return put;
+  }
+
+  @Override
+  public void deleteStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException {
+    validate(tableName, regionName, storeName);
+    Delete delete = new Delete(
+      Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    delete.addColumns(STOREFILE_FAMILY_INCLUDED, STOREFILE_QUALIFIER);
+    doDelete(Lists.newArrayList(delete));
+  }
+
+  @Override
+  public void deleteRegion(String regionName) throws IOException {
+    Scan scan = getScanWithFilter(regionName);
+    List<Delete> familiesToDelete = new ArrayList<>();
+    for (Result result : getResultScanner(scan)) {
+      String rowKey = Bytes.toString(result.getRow());
+      Delete delete = new Delete(Bytes.toBytes(rowKey));
+      familiesToDelete.add(delete);
+    }
+    doDelete(familiesToDelete);
+  }
+
+  @Override
+  public Set<String> getTrackedFamilies(String tableName, String regionName)
+    throws IOException {
+    // find all rows by regionName
+    Scan scan = getScanWithFilter(regionName);
+
+    Set<String> families = new HashSet<>();
+    for (Result result : getResultScanner(scan)) {
+      String rowKey = Bytes.toString(result.getRow());
+      String family =
+        StorefileTrackingUtils.getFamilyFromKey(rowKey, tableName, regionName, getSeparator());
+      families.add(family);

Review comment:
       Would be much better to do this without converting back to Strings.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
##########
@@ -203,5 +213,21 @@ public double getCompactionPressure() {
   public Comparator<HStoreFile> getStoreFileComparator() {
     return storeFileComparator;
   }
+
+  void setStorefiles(ImmutableList<HStoreFile> storefiles) {
+    this.storefiles = storefiles;
+  }
+
+  void setCompactedfiles(ImmutableList<HStoreFile> compactedfiles) {

Review comment:
       nit `setCompactedFiles`

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessor.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Helper class to interact with the hbase storefile tracking data persisted as off-memory data
+ * from the {@link StoreFileManager}
+ *
+ * There is only a set of tracking storefiles, 'included'.
+ *
+ * e.g. list of storefile paths in 'included' should be the identical copy of the in-memory
+ * {@link HStoreFile}'s Path(s) and can be reused during region opens and region reassignment.
+ */
+@InterfaceAudience.Private
+public interface StoreFilePathAccessor {
+
+  /**
+   * Create the storefile tracking with the help of using the masterService
+   * @param masterServices instance of HMaster
+   * @throws IOException if Master is not running or connection has been lost
+   */
+  void initialize(final MasterServices masterServices) throws IOException;
+
+  /**
+   * GET storefile paths from the 'included' data set
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @return list of StoreFile paths that should be included in reads in this store,
+   *         returns an empty list if the target cell is empty or doesn't exist.
+   * @throws IOException if a remote or network exception occurs during Get
+   */
+  List<Path> getIncludedStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException;
+
+  /**
+   * Writes the specified updates to the tracking
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @param storeFilePathUpdate Updates to be persisted
+   * @throws IOException if a remote or network exception occurs during write
+   */
+  void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final StoreFilePathUpdate storeFilePathUpdate) throws IOException;
+
+  /**
+   * Delete storefile paths for a tracking column family, normally used when a region-store is
+   * completely removed due to region split or merge
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @throws IOException if a remote or network exception occurs during delete
+   */
+  void deleteStoreFilePaths(final String tableName, final String regionName, final String storeName)
+    throws IOException;
+
+  /**
+   * Returns the families being tracked in the storefile tracking data for the given
+   * table/region

Review comment:
       For user-regions (not meta), is there a reason that we'd only want a subset of Stores to be using tracking data?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileTrackingUtils.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to support persistent storefile tracking
+ */
+@InterfaceAudience.Private
+public final class StorefileTrackingUtils {
+
+  private static Logger LOG = LoggerFactory.getLogger(StorefileTrackingUtils.class);
+  public static final long SLEEP_DELTA_MS = TimeUnit.MILLISECONDS.toMillis(100);
+
+  private StorefileTrackingUtils() {
+    // private for utility class
+  }
+
+  public static boolean isStorefileTrackingPersistEnabled(Configuration conf) {
+    boolean isStoreTrackingPersistEnabled =
+      conf.getBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED,
+        HConstants.DEFAULT_STOREFILE_TRACKING_PERSIST_ENABLED);
+    boolean isPersistedStoreEngineSet =
+      conf.get(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName())
+        .equals(PersistedStoreEngine.class.getName());
+    boolean isFeatureEnabled = isStoreTrackingPersistEnabled && isPersistedStoreEngineSet;
+    if (isStoreTrackingPersistEnabled ^ isPersistedStoreEngineSet) {
+      // check if both configuration are correct.
+      String errorMessage = String.format("please set %s to true and set store engine key %s to %s "
+          + "to enable persist storefile tracking",
+        HConstants.STOREFILE_TRACKING_PERSIST_ENABLED,
+        StoreEngine.STORE_ENGINE_CLASS_KEY, PersistedStoreEngine.class.getName());
+      throw new IllegalArgumentException(errorMessage);
+    }
+    return isFeatureEnabled;
+  }
+
+  /**
+   * if storefile tracking feature is configured, Initialize hbase:storefile table and wait for it
+   * to be online. Otherwise, look for hbase:storefile table and remove it
+   *
+   * @param masterServices masterServices
+   * @throws IOException if hbase:storefile table cannot be initialized and be online
+   */
+  public static void init(MasterServices masterServices) throws IOException {
+    createStorefileTable(masterServices);
+    waitForStoreFileTableOnline(masterServices);
+  }
+
+  /**
+   * Cleans up all storefile related state on the cluster. disable and delete hbase:storefile
+   * if found
+   * @param masterServices {@link MasterServices}
+   * @throws IOException if failures
+   */
+  private static void cleanup(MasterServices masterServices) throws IOException {
+    try {
+      masterServices.getConnection().getAdmin().disableTable(TableName.STOREFILE_TABLE_NAME);
+      masterServices.getConnection().getAdmin().deleteTable(TableName.STOREFILE_TABLE_NAME);
+    } catch (IOException ex) {
+      final String message = "Failed disable and deleting table " +
+        TableName.STOREFILE_TABLE_NAME.getNameAsString();
+      LOG.error(message);
+      throw new IOException(message, ex);
+    }
+  }
+
+  public static StoreFilePathAccessor createStoreFilePathAccessor(Configuration conf,
+    Connection connection) {
+    return new HTableStoreFilePathAccessor(conf, connection);
+  }
+
+  public static List<Path> convertStoreFilesToPaths(Collection<HStoreFile> storeFiles) {

Review comment:
       nit just call this `toPaths`

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFileManager.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSortedSet;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * A Storefile manager that is used by {@link PersistedStoreEngine} that persists the in-memory
+ * storefile tracking to a persistent table hbase:storefile.
+ *
+ * We don't override the {@link #clearFiles()} from {@link DefaultStoreFileManager} and persist
+ * in-memory storefiles tracking, it will be reused when region reassigns on a different
+ * region server.
+ */
+@InterfaceAudience.Private
+public class PersistedStoreFileManager extends DefaultStoreFileManager {
+  private static final Logger LOG = LoggerFactory.getLogger(PersistedStoreFileManager.class);
+  private final RegionInfo regionInfo;
+  private final String tableName;
+  private final String regionName;
+  private final String storeName;
+  private StoreFilePathAccessor accessor;
+  private Configuration conf;
+  // only uses for warmupHRegion
+  private boolean readOnly;
+
+  public PersistedStoreFileManager(CellComparator cellComparator,
+    Comparator<HStoreFile> storeFileComparator, Configuration conf,
+    CompactionConfiguration compactionConfiguration, HRegionFileSystem regionFs,
+    RegionInfo regionInfo, String familyName, StoreFilePathAccessor accessor, boolean readOnly) {
+    super(cellComparator, storeFileComparator, conf, compactionConfiguration, regionFs, familyName);
+    this.conf = conf;
+    this.regionInfo = regionInfo;
+    this.tableName = regionInfo.getTable().getNameAsString();
+    this.regionName = regionInfo.getEncodedName();
+    this.storeName = familyName;
+    this.accessor = accessor;
+    this.readOnly = readOnly;
+  }
+
+  public PersistedStoreFileManager(CellComparator cellComparator,
+    Comparator<HStoreFile> storeFileComparator, Configuration conf,
+    CompactionConfiguration compactionConfiguration, HRegionFileSystem regionFs,
+    RegionInfo regionInfo, String familyName, StoreFilePathAccessor accessor) {
+    this(cellComparator, storeFileComparator, conf, compactionConfiguration, regionFs, regionInfo,
+      familyName, accessor, false);
+  }
+
+  /**
+   * Loads the specified storeFiles to the StoreFileManager to be included in reads.
+   *
+   * @param storeFiles list of storefiles to be loaded, could be an empty list. throws exception
+   *                   if it's null.
+   */
+  @Override
+  public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
+    Preconditions.checkArgument(storeFiles != null, "store files cannot be "
+      + "null when loading");
+    if (storeFiles.isEmpty()) {
+      LOG.warn("Other than fresh region with no store files, store files should not be empty");
+      return;
+    }
+    ImmutableList<HStoreFile> sortedStorefiles =
+      ImmutableList.sortedCopyOf(getStoreFileComparator(), storeFiles);
+    setStorefiles(sortedStorefiles);
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(sortedStorefiles).build());
+  }
+
+  @Override
+  public Collection<StoreFileInfo> loadInitialFiles() throws IOException {
+    List<Path> pathList = accessor.getIncludedStoreFilePaths(tableName, regionName, storeName);
+    boolean isEmptyInPersistedFilePaths = CollectionUtils.isEmpty(pathList);
+    if (isEmptyInPersistedFilePaths) {
+      // When the path accessor is returning empty result, we scan the
+      // the file storage and see if there is any existing HFiles should be loaded.
+      // the scan is a one time process when store open during region assignment.
+      //
+      // this is especially used for region and store open
+      // 1. First time migration from a filesystem based e.g. DefaultStoreFileEngine
+      // 2. After region split and merge
+      // 3. After table clone and create new HFiles directly into data directory
+      //
+      // Also we don't handle the inconsistency between storefile tracking and file system, which
+      // will be handled by a HBCK command
+      LOG.info("Cannot find tracking paths ({}) for store {} in region {} of "
+          + "table {}, fall back to scan the storage to get a list of storefiles to be opened"
+        , isEmptyInPersistedFilePaths, storeName, regionName,
+        tableName);
+      return getRegionFs().getStoreFiles(getFamilyName());
+    }
+    ArrayList<StoreFileInfo> storeFiles = new ArrayList<>();
+    for (Path storeFilePath : pathList) {
+      if (!StoreFileInfo.isValid(getRegionFs().getFileSystem().getFileStatus(storeFilePath))) {
+        // TODO add a comment how this file will be removed when we have cleaner
+        LOG.warn("Invalid StoreFile: {}", storeFilePath);
+        continue;
+      }
+      StoreFileInfo info = ServerRegionReplicaUtil
+        .getStoreFileInfo(conf, getRegionFs().getFileSystem(), regionInfo,
+          ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo), getFamilyName(),
+          storeFilePath);
+      storeFiles.add(info);
+    }
+    return storeFiles;
+  }
+
+
+  @Override
+  public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
+    // return in case of empty storeFiles as it is a No-op, empty files expected during region close
+    if (CollectionUtils.isEmpty(sfs)) {
+      return;
+    }
+    ImmutableList<HStoreFile> storefiles = ImmutableList
+      .sortedCopyOf(getStoreFileComparator(), Iterables.concat(getStorefiles(), sfs));
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(storefiles).build());
+    setStorefiles(storefiles);
+  }
+
+  @Override
+  public void addCompactionResults(Collection<HStoreFile> newCompactedfiles,
+    Collection<HStoreFile> results) throws IOException {
+    Preconditions.checkNotNull(newCompactedfiles, "compactedFiles cannot be null");
+    Preconditions.checkNotNull(results, "compaction result cannot be null");
+    // only allow distinct path to be included, especially rerun after a compaction fails
+    ImmutableList<HStoreFile> storefiles = ImmutableList.sortedCopyOf(getStoreFileComparator(),
+      Iterables.concat(Iterables.filter(getStorefiles(), sf -> !newCompactedfiles.contains(sf)),
+        results)).asList();
+    Preconditions.checkArgument(!CollectionUtils.isEmpty(storefiles),
+      "storefiles cannot be empty when adding compaction results");
+
+    ImmutableList<HStoreFile> compactedfiles = ImmutableSortedSet
+      .copyOf(getStoreFileComparator(),
+        Iterables.concat(getCompactedfiles(), newCompactedfiles))
+      .asList();
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(storefiles).build());
+    setStorefiles(storefiles);
+    setCompactedfiles(compactedfiles);
+    newCompactedfiles.forEach(HStoreFile::markCompactedAway);
+  }
+
+  void updatePathListToTracker(StoreFilePathUpdate storeFilePathUpdate) throws IOException {
+    try {
+      if (!readOnly) {
+        accessor.writeStoreFilePaths(tableName, regionName, storeName, storeFilePathUpdate);
+      }
+    } catch (IOException e) {
+      String message = "Failed to update Path list of " + tableName + "-" + regionName +
+        "-" + storeName + ", on " + TableName.STOREFILE_STR + ". The new files are not "
+        + "persistent and will be removed from " + regionName + "," + storeName +
+        ".\nFailed update: " + storeFilePathUpdate;
+      LOG.warn(message);

Review comment:
       String.format or the parameterized log message, please.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFileManager.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSortedSet;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * A Storefile manager that is used by {@link PersistedStoreEngine} that persists the in-memory
+ * storefile tracking to a persistent table hbase:storefile.
+ *
+ * We don't override the {@link #clearFiles()} from {@link DefaultStoreFileManager} and persist
+ * in-memory storefiles tracking, it will be reused when region reassigns on a different
+ * region server.
+ */
+@InterfaceAudience.Private
+public class PersistedStoreFileManager extends DefaultStoreFileManager {
+  private static final Logger LOG = LoggerFactory.getLogger(PersistedStoreFileManager.class);
+  private final RegionInfo regionInfo;
+  private final String tableName;
+  private final String regionName;
+  private final String storeName;
+  private StoreFilePathAccessor accessor;
+  private Configuration conf;
+  // only uses for warmupHRegion
+  private boolean readOnly;
+
+  public PersistedStoreFileManager(CellComparator cellComparator,
+    Comparator<HStoreFile> storeFileComparator, Configuration conf,
+    CompactionConfiguration compactionConfiguration, HRegionFileSystem regionFs,
+    RegionInfo regionInfo, String familyName, StoreFilePathAccessor accessor, boolean readOnly) {
+    super(cellComparator, storeFileComparator, conf, compactionConfiguration, regionFs, familyName);
+    this.conf = conf;
+    this.regionInfo = regionInfo;
+    this.tableName = regionInfo.getTable().getNameAsString();
+    this.regionName = regionInfo.getEncodedName();
+    this.storeName = familyName;
+    this.accessor = accessor;
+    this.readOnly = readOnly;
+  }
+
+  public PersistedStoreFileManager(CellComparator cellComparator,
+    Comparator<HStoreFile> storeFileComparator, Configuration conf,
+    CompactionConfiguration compactionConfiguration, HRegionFileSystem regionFs,
+    RegionInfo regionInfo, String familyName, StoreFilePathAccessor accessor) {
+    this(cellComparator, storeFileComparator, conf, compactionConfiguration, regionFs, regionInfo,
+      familyName, accessor, false);
+  }
+
+  /**
+   * Loads the specified storeFiles to the StoreFileManager to be included in reads.
+   *
+   * @param storeFiles list of storefiles to be loaded, could be an empty list. throws exception
+   *                   if it's null.
+   */
+  @Override
+  public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
+    Preconditions.checkArgument(storeFiles != null, "store files cannot be "
+      + "null when loading");
+    if (storeFiles.isEmpty()) {
+      LOG.warn("Other than fresh region with no store files, store files should not be empty");
+      return;
+    }
+    ImmutableList<HStoreFile> sortedStorefiles =
+      ImmutableList.sortedCopyOf(getStoreFileComparator(), storeFiles);
+    setStorefiles(sortedStorefiles);
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(sortedStorefiles).build());
+  }
+
+  @Override
+  public Collection<StoreFileInfo> loadInitialFiles() throws IOException {
+    List<Path> pathList = accessor.getIncludedStoreFilePaths(tableName, regionName, storeName);
+    boolean isEmptyInPersistedFilePaths = CollectionUtils.isEmpty(pathList);

Review comment:
       Is there a way we can definitively know that our storefilepaths is uninitialized and we need to perform this action vs. an empty store?

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessorTestBase.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+public abstract class StoreFilePathAccessorTestBase {
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  protected StoreFilePathAccessor storeFilePathAccessor;
+  protected static String REGION_NAME = UUID.randomUUID().toString().replaceAll("-", "");
+  protected static String STORE_NAME = UUID.randomUUID().toString();
+  protected static List<Path> EMPTY_PATH = Collections.emptyList();
+  protected static List<Path> INCLUDE_EXAMPLE_PATH =
+    Lists.newArrayList(new Path("hdfs://foo/bar1"), new Path("hdfs://foo/bar2"));

Review comment:
       final?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
##########
@@ -203,5 +213,21 @@ public double getCompactionPressure() {
   public Comparator<HStoreFile> getStoreFileComparator() {
     return storeFileComparator;
   }
+
+  void setStorefiles(ImmutableList<HStoreFile> storefiles) {
+    this.storefiles = storefiles;
+  }
+
+  void setCompactedfiles(ImmutableList<HStoreFile> compactedfiles) {

Review comment:
       I'd challenge you to see if there's a better API we could add to DefaultStoreFileManager than this. It's a little brittle in that PersistedStoreFileManager has to be knowing when to call these setters. Maybe there's API in which there can be one method in this class which computes and sets the compactedFiles and "current" files which both this implementation and the new implementation can leverage?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessor.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Helper class to interact with the hbase storefile tracking data persisted as off-memory data
+ * from the {@link StoreFileManager}
+ *
+ * There is only a set of tracking storefiles, 'included'.
+ *
+ * e.g. list of storefile paths in 'included' should be the identical copy of the in-memory
+ * {@link HStoreFile}'s Path(s) and can be reused during region opens and region reassignment.
+ */
+@InterfaceAudience.Private
+public interface StoreFilePathAccessor {
+
+  /**
+   * Create the storefile tracking with the help of using the masterService
+   * @param masterServices instance of HMaster
+   * @throws IOException if Master is not running or connection has been lost
+   */
+  void initialize(final MasterServices masterServices) throws IOException;
+
+  /**
+   * GET storefile paths from the 'included' data set
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @return list of StoreFile paths that should be included in reads in this store,
+   *         returns an empty list if the target cell is empty or doesn't exist.
+   * @throws IOException if a remote or network exception occurs during Get
+   */
+  List<Path> getIncludedStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException;
+
+  /**
+   * Writes the specified updates to the tracking

Review comment:
       What is a "write" in this context? Defining what the total set of files for this Region+Store is supposed to be? Not clear from javadoc.
   
   `storeFilePathUpdate`argument javadoc is also a little vague (i.e. what's an "update"?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org