You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2021/11/18 13:59:50 UTC

[hbase] branch branch-2.4 updated: HBASE-26421 Use HFileLink file to replace entire file's reference when splitting (#3854)

This is an automated email from the ASF dual-hosted git repository.

haxiaolin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 24b7008  HBASE-26421 Use HFileLink file to replace entire file's reference when splitting (#3854)
24b7008 is described below

commit 24b700848f2c766750044b31a2a88dc777f20a92
Author: Xiaolin Ha <ha...@apache.org>
AuthorDate: Thu Nov 18 21:37:10 2021 +0800

    HBASE-26421 Use HFileLink file to replace entire file's reference when splitting (#3854)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../java/org/apache/hadoop/hbase/io/HFileLink.java |  32 +++++-
 .../assignment/SplitTableRegionProcedure.java      |  28 +++--
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   4 +-
 .../hbase/regionserver/HRegionFileSystem.java      |  49 +++++++-
 .../java/org/apache/hadoop/hbase/util/FSUtils.java |  65 ++++++++---
 .../hadoop/hbase/regionserver/TestHStoreFile.java  |  25 +++--
 .../TestSplitTransactionOnCluster.java             | 124 ++++++++++++++++++++-
 7 files changed, 277 insertions(+), 50 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
index e2c80b2..74836ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
@@ -79,8 +79,7 @@ public class HFileLink extends FileLink {
         RegionInfoBuilder.ENCODED_REGION_NAME_REGEX, StoreFileInfo.HFILE_NAME_REGEX);
 
   /** Define the HFile Link name parser in the form of: table=region-hfile */
-  //made package private for testing
-  static final Pattern LINK_NAME_PATTERN =
+  public static final Pattern LINK_NAME_PATTERN =
     Pattern.compile(String.format("^(?:(%s)(?:\\=))?(%s)=(%s)-(%s)$",
       TableName.VALID_NAMESPACE_REGEX, TableName.VALID_TABLE_QUALIFIER_REGEX,
       RegionInfoBuilder.ENCODED_REGION_NAME_REGEX, StoreFileInfo.HFILE_NAME_REGEX));
@@ -400,15 +399,40 @@ public class HFileLink extends FileLink {
     String tableName = CommonFSUtils.getTableName(dstFamilyPath.getParent().getParent())
         .getNameAsString();
 
+    return create(conf, fs, dstFamilyPath, familyName, tableName, regionName, linkedTable,
+      linkedRegion, hfileName, createBackRef);
+  }
+
+  /**
+   * Create a new HFileLink
+   *
+   * <p>It also adds a back-reference to the hfile back-reference directory
+   * to simplify the reference-count and the cleaning process.
+   * @param conf {@link Configuration} to read for the archive directory name
+   * @param fs {@link FileSystem} on which to write the HFileLink
+   * @param dstFamilyPath - Destination path (table/region/cf/)
+   * @param dstTableName - Destination table name
+   * @param dstRegionName - Destination region name
+   * @param linkedTable - Linked Table Name
+   * @param linkedRegion - Linked Region Name
+   * @param hfileName - Linked HFile name
+   * @param createBackRef - Whether back reference should be created. Defaults to true.
+   * @return true if the file is created, otherwise the file exists.
+   * @throws IOException on file or parent directory creation failure
+   */
+  public static boolean create(final Configuration conf, final FileSystem fs,
+      final Path dstFamilyPath, final String familyName, final String dstTableName,
+      final String dstRegionName, final TableName linkedTable, final String linkedRegion,
+      final String hfileName, final boolean createBackRef) throws IOException {
     String name = createHFileLinkName(linkedTable, linkedRegion, hfileName);
-    String refName = createBackReferenceName(tableName, regionName);
+    String refName = createBackReferenceName(dstTableName, dstRegionName);
 
     // Make sure the destination directory exists
     fs.mkdirs(dstFamilyPath);
 
     // Make sure the FileLink reference directory exists
     Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf,
-          linkedTable, linkedRegion, familyName);
+        linkedTable, linkedRegion, familyName);
     Path backRefPath = null;
     if (createBackRef) {
       Path backRefssDir = getBackReferencesDir(archiveStoreDir, hfileName);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index 09ac827..165be7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -141,13 +142,14 @@ public class SplitTableRegionProcedure
         .setSplit(false)
         .setRegionId(rid)
         .build();
-    if(tableDescriptor.getRegionSplitPolicyClassName() != null) {
+
+    if (tableDescriptor.getRegionSplitPolicyClassName() != null) {
       // Since we don't have region reference here, creating the split policy instance without it.
       // This can be used to invoke methods which don't require Region reference. This instantiation
       // of a class on Master-side though it only makes sense on the RegionServer-side is
       // for Phoenix Local Indexing. Refer HBASE-12583 for more information.
       Class<? extends RegionSplitPolicy> clazz =
-        RegionSplitPolicy.getSplitPolicyClass(tableDescriptor, conf);
+          RegionSplitPolicy.getSplitPolicyClass(tableDescriptor, conf);
       this.splitPolicy = ReflectionUtils.newInstance(clazz, conf);
     }
   }
@@ -622,17 +624,17 @@ public class SplitTableRegionProcedure
 
     Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
 
-    assertReferenceFileCount(fs, expectedReferences.getFirst(),
+    assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
       regionFs.getSplitsDir(daughterOneRI));
     //Move the files from the temporary .splits to the final /table/region directory
     regionFs.commitDaughterRegion(daughterOneRI);
-    assertReferenceFileCount(fs, expectedReferences.getFirst(),
+    assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
       new Path(tabledir, daughterOneRI.getEncodedName()));
 
-    assertReferenceFileCount(fs, expectedReferences.getSecond(),
+    assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
       regionFs.getSplitsDir(daughterTwoRI));
     regionFs.commitDaughterRegion(daughterTwoRI);
-    assertReferenceFileCount(fs, expectedReferences.getSecond(),
+    assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
       new Path(tabledir, daughterTwoRI.getEncodedName()));
   }
 
@@ -758,11 +760,15 @@ public class SplitTableRegionProcedure
     return new Pair<Integer, Integer>(daughterA, daughterB);
   }
 
-  private void assertReferenceFileCount(final FileSystem fs, final int expectedReferenceFileCount,
-      final Path dir) throws IOException {
-    if (expectedReferenceFileCount != 0 &&
-        expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(fs, dir)) {
-      throw new IOException("Failing split. Expected reference file count isn't equal.");
+  private void assertSplitResultFilesCount(final FileSystem fs,
+      final int expectedSplitResultFileCount, Path dir)
+    throws IOException {
+    if (expectedSplitResultFileCount != 0) {
+      int resultFileCount = FSUtils.getRegionReferenceAndLinkFileCount(fs, dir);
+      if (expectedSplitResultFileCount != resultFileCount) {
+        throw new IOException("Failing split. Didn't have expected reference and HFileLink files"
+            + ", expected=" + expectedSplitResultFileCount + ", actual=" + resultFileCount);
+      }
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ee9026e..401c16e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1106,8 +1106,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
 
-    LOG.info("Opened {}; next sequenceid={}; {}, {}",
-      this.getRegionInfo().getShortNameToLog(), nextSeqId, this.splitPolicy, this.flushPolicy);
+    LOG.info("Opened {}; next sequenceid={}; {}, {}", this.getRegionInfo().getShortNameToLog(),
+        nextSeqId, this.splitPolicy, this.flushPolicy);
 
     // A region can be reopened if failed a split; reset flags
     this.closing.set(false);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 7e22a51..2ef4d951 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.apache.hadoop.hbase.io.HFileLink.LINK_NAME_PATTERN;
 import edu.umd.cs.findbugs.annotations.Nullable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -27,6 +28,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.regex.Matcher;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,11 +41,13 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -53,7 +57,6 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
@@ -698,15 +701,18 @@ public class HRegionFileSystem {
    */
   public Path splitStoreFile(RegionInfo hri, String familyName, HStoreFile f, byte[] splitRow,
       boolean top, RegionSplitPolicy splitPolicy) throws IOException {
+    boolean createLinkFile = false;
+    Path splitDir = new Path(getSplitsDir(hri), familyName);
     if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
       // Check whether the split row lies in the range of the store file
       // If it is outside the range, return directly.
       f.initReader();
       try {
         Cell splitKey = PrivateCellUtil.createFirstOnRow(splitRow);
+        Optional<Cell> lastKey = f.getLastKey();
+        Optional<Cell> firstKey = f.getFirstKey();
         if (top) {
           //check if larger than last key.
-          Optional<Cell> lastKey = f.getLastKey();
           // If lastKey is null means storefile is empty.
           if (!lastKey.isPresent()) {
             return null;
@@ -714,9 +720,12 @@ public class HRegionFileSystem {
           if (f.getComparator().compare(splitKey, lastKey.get()) > 0) {
             return null;
           }
+          if (firstKey.isPresent() && f.getComparator().compare(splitKey, firstKey.get()) <= 0) {
+            LOG.debug("Will create HFileLink file for {}, top=true", f.getPath());
+            createLinkFile = true;
+          }
         } else {
           //check if smaller than first key
-          Optional<Cell> firstKey = f.getFirstKey();
           // If firstKey is null means storefile is empty.
           if (!firstKey.isPresent()) {
             return null;
@@ -724,13 +733,45 @@ public class HRegionFileSystem {
           if (f.getComparator().compare(splitKey, firstKey.get()) < 0) {
             return null;
           }
+          if (lastKey.isPresent() && f.getComparator().compare(splitKey, lastKey.get()) >= 0) {
+            LOG.debug("Will create HFileLink file for {}, top=false", f.getPath());
+            createLinkFile = true;
+          }
         }
       } finally {
         f.closeStoreFile(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
       }
     }
 
-    Path splitDir = new Path(getSplitsDir(hri), familyName);
+    if (createLinkFile) {
+      // create HFileLink file instead of Reference file for child
+      String hfileName = f.getPath().getName();
+      TableName linkedTable = regionInfoForFs.getTable();
+      String linkedRegion = regionInfoForFs.getEncodedName();
+      try {
+        if (HFileLink.isHFileLink(hfileName)) {
+          Matcher m = LINK_NAME_PATTERN.matcher(hfileName);
+          if (!m.matches()) {
+            throw new IllegalArgumentException(hfileName + " is not a valid HFileLink name!");
+          }
+          linkedTable = TableName.valueOf(m.group(1), m.group(2));
+          linkedRegion = m.group(3);
+          hfileName = m.group(4);
+        }
+        // must create back reference here
+        HFileLink.create(conf, fs, splitDir, familyName, hri.getTable().getNameAsString(),
+          hri.getEncodedName(), linkedTable, linkedRegion, hfileName, true);
+        Path path =
+          new Path(splitDir, HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName));
+        LOG.info("Created linkFile:" + path.toString() + " for child: " + hri.getEncodedName()
+          + ", parent: " + regionInfoForFs.getEncodedName());
+        return path;
+      } catch (IOException e) {
+        // if create HFileLink file failed, then just skip the error and create Reference file
+        LOG.error("Create link file for " + hfileName + " for child " + hri.getEncodedName()
+          + "failed, will create Reference file", e);
+      }
+    }
     // A reference to the bottom half of the hsf store file.
     Reference r =
       top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 91775b9..c8e77ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -1049,28 +1049,65 @@ public final class FSUtils {
    * @return List of paths to valid family directories in region dir.
    * @throws IOException
    */
-  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
+  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
+      throws IOException {
     // assumes we are in a region dir.
-    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
-    List<Path> familyDirs = new ArrayList<>(fds.length);
+    return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
+  }
+
+  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
+      throws IOException {
+    return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
+  }
+
+  public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
+      throws IOException {
+    return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
+  }
+
+  private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
+      final PathFilter pathFilter) throws IOException {
+    FileStatus[] fds = fs.listStatus(dir, pathFilter);
+    List<Path> files = new ArrayList<>(fds.length);
     for (FileStatus fdfs: fds) {
       Path fdPath = fdfs.getPath();
-      familyDirs.add(fdPath);
+      files.add(fdPath);
     }
-    return familyDirs;
+    return files;
   }
 
-  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
-    List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
-    if (fds == null) {
-      return Collections.emptyList();
+  public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
+    int result = 0;
+    try {
+      for (Path familyDir : getFamilyDirs(fs, p)) {
+        result += getReferenceAndLinkFilePaths(fs, familyDir).size();
+      }
+    } catch (IOException e) {
+      LOG.warn("Error Counting reference files.", e);
     }
-    List<Path> referenceFiles = new ArrayList<>(fds.size());
-    for (FileStatus fdfs: fds) {
-      Path fdPath = fdfs.getPath();
-      referenceFiles.add(fdPath);
+    return result;
+  }
+
+  public static class ReferenceAndLinkFileFilter implements PathFilter {
+
+    private final FileSystem fs;
+
+    public ReferenceAndLinkFileFilter(FileSystem fs) {
+      this.fs = fs;
+    }
+
+    @Override
+    public boolean accept(Path rd) {
+      try {
+        // only files can be references.
+        return !fs.getFileStatus(rd).isDirectory() && (StoreFileInfo.isReference(rd) ||
+          HFileLink.isHFileLink(rd));
+      } catch (IOException ioe) {
+        // Maybe the file was moved or the fs was disconnected.
+        LOG.warn("Skipping file " + rd +" due to IOException", ioe);
+        return false;
+      }
     }
-    return referenceFiles;
   }
 
   /**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
index b7d0db0..8ddb93a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
@@ -115,15 +115,18 @@ public class TestHStoreFile extends HBaseTestCase {
   }
 
   /**
-   * Write a file and then assert that we can read from top and bottom halves
-   * using two HalfMapFiles.
+   * Write a file and then assert that we can read from top and bottom halves using two
+   * HalfMapFiles, as well as one HalfMapFile and one HFileLink file.
    */
   @Test
-  public void testBasicHalfMapFile() throws Exception {
+  public void testBasicHalfAndHFileLinkMapFile() throws Exception {
     final HRegionInfo hri =
-        new HRegionInfo(TableName.valueOf("testBasicHalfMapFileTb"));
-    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
-      conf, fs, new Path(testDir, hri.getTable().getNameAsString()), hri);
+        new HRegionInfo(TableName.valueOf("testBasicHalfAndHFileLinkMapFile"));
+    // The locations of HFileLink refers hfiles only should be consistent with the table dir
+    // create by CommonFSUtils directory, so we should make the region directory under
+    // the mode of CommonFSUtils.getTableDir here.
+    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs,
+      CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), hri.getTable()), hri);
 
     HFileContext meta = new HFileContextBuilder().withBlockSize(2*1024).build();
     StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
@@ -142,7 +145,7 @@ public class TestHStoreFile extends HBaseTestCase {
   }
 
   // pick an split point (roughly halfway)
-  byte[] SPLITKEY = new byte[] { (LAST_CHAR + FIRST_CHAR)/2, FIRST_CHAR};
+  byte[] SPLITKEY = new byte[] { (LAST_CHAR + FIRST_CHAR) / 2, FIRST_CHAR };
 
   /*
    * Writes HStoreKey and ImmutableBytes data to passed writer and
@@ -383,8 +386,10 @@ public class TestHStoreFile extends HBaseTestCase {
       throws IOException {
     f.initReader();
     Cell midkey = f.getReader().midKey().get();
-    KeyValue midKV = (KeyValue)midkey;
-    byte [] midRow = CellUtil.cloneRow(midKV);
+    KeyValue midKV = (KeyValue) midkey;
+    // 1. test using the midRow as the splitKey, this test will generate two Reference files
+    // in the children
+    byte[] midRow = CellUtil.cloneRow(midKV);
     // Create top split.
     HRegionInfo topHri = new HRegionInfo(regionFs.getRegionInfo().getTable(), null, midRow);
     Path topPath = splitStoreFile(regionFs, topHri, TEST_FAMILY, f, midRow, true);
@@ -446,7 +451,7 @@ public class TestHStoreFile extends HBaseTestCase {
       regionFs.cleanupDaughterRegion(topHri);
       regionFs.cleanupDaughterRegion(bottomHri);
 
-      // Next test using a midkey that does not exist in the file.
+      // 2. test using a midkey which will generate one Reference file and one HFileLink file.
       // First, do a key that is < than first key. Ensure splits behave
       // properly.
       byte [] badmidkey = Bytes.toBytes("  .");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index b591983..8738b9a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.apache.hadoop.hbase.client.TableDescriptorBuilder.SPLIT_POLICY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -41,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
@@ -75,6 +78,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LoadBalancer;
@@ -139,13 +143,13 @@ public class TestSplitTransactionOnCluster {
   private MiniHBaseCluster cluster = null;
   private static final int NB_SERVERS = 3;
 
-  static final HBaseTestingUtility TESTING_UTIL =
-    new HBaseTestingUtility();
+  static final HBaseTestingUtility TESTING_UTIL = new HBaseTestingUtility();
 
   @Rule
   public TestName name = new TestName();
 
-  @BeforeClass public static void before() throws Exception {
+  @BeforeClass
+  public static void before() throws Exception {
     TESTING_UTIL.getConfiguration().setInt(HConstants.HBASE_BALANCER_PERIOD, 60000);
     StartMiniClusterOption option = StartMiniClusterOption.builder()
         .masterClass(MyMaster.class).numRegionServers(NB_SERVERS).
@@ -153,11 +157,13 @@ public class TestSplitTransactionOnCluster {
     TESTING_UTIL.startMiniCluster(option);
   }
 
-  @AfterClass public static void after() throws Exception {
+  @AfterClass
+  public static void after() throws Exception {
     TESTING_UTIL.shutdownMiniCluster();
   }
 
-  @Before public void setup() throws IOException {
+  @Before
+  public void setup() throws IOException {
     TESTING_UTIL.ensureSomeNonStoppedRegionServersAvailable(NB_SERVERS);
     this.admin = TESTING_UTIL.getAdmin();
     this.cluster = TESTING_UTIL.getMiniHBaseCluster();
@@ -360,6 +366,114 @@ public class TestSplitTransactionOnCluster {
     admin.deleteTable(tableName);
   }
 
+  @Test
+  public void testContinuousSplitUsingLinkFile() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    // Create table then get the single region for our new table.
+    byte[] cf = Bytes.toBytes("cf");
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf));
+    String splitPolicy = ConstantSizeRegionSplitPolicy.class.getName();
+    builder.setValue(SPLIT_POLICY, splitPolicy);
+
+    admin.createTable(builder.build());
+    admin.compactionSwitch(false, new ArrayList<>());
+
+    assertNotEquals("Unable to retrieve regions of the table", -1,
+      TESTING_UTIL.waitFor(10000, () -> cluster.getRegions(tableName).size() == 1));
+    Table table = TESTING_UTIL.getConnection().getTable(tableName);
+    // insert data
+    insertData(tableName, admin, table, 10);
+    insertData(tableName, admin, table, 20);
+    insertData(tableName, admin, table, 40);
+    int rowCount = 3 * 4;
+    Scan scan = new Scan();
+    scanValidate(scan, rowCount, table);
+
+    // Split
+    admin.splitRegionAsync(cluster.getRegions(tableName).get(0).getRegionInfo().getRegionName(),
+      Bytes.toBytes("row14"));
+    // wait for the split to complete or get interrupted.  If the split completes successfully,
+    // the procedure will return true; if the split fails, the procedure would throw exception.
+    Thread.sleep(3000);
+    assertNotEquals("Table is not split properly?", -1,
+      TESTING_UTIL.waitFor(3000, () -> cluster.getRegions(tableName).size() == 2));
+    // we have 2 daughter regions
+    HRegion hRegion1 = cluster.getRegions(tableName).get(0);
+    HRegion hRegion2 = cluster.getRegions(tableName).get(1);
+    HStore hStore1 = hRegion1.getStore(cf);
+    HStore hStore2 = hRegion2.getStore(cf);
+    // the sum of store files of the two children should be equal to their parent
+    assertEquals(3, hStore1.getStorefilesCount() + hStore2.getStorefilesCount());
+    // both the two children should have link files
+    for (StoreFile sf : hStore1.getStorefiles()) {
+      assertTrue(HFileLink.isHFileLink(sf.getPath()));
+    }
+    for (StoreFile sf : hStore2.getStorefiles()) {
+      assertTrue(HFileLink.isHFileLink(sf.getPath()));
+    }
+    // validate children data
+    scan = new Scan();
+    scanValidate(scan, rowCount, table);
+
+    //Continuous Split
+    findRegionToSplit(tableName, "row24");
+    Thread.sleep(3000);
+    assertNotEquals("Table is not split properly?", -1,
+      TESTING_UTIL.waitFor(3000, () -> cluster.getRegions(tableName).size() == 3));
+    // now table has 3 region, each region should have one link file
+    for (HRegion newRegion : cluster.getRegions(tableName)) {
+      assertEquals(1, newRegion.getStore(cf).getStorefilesCount());
+      assertTrue(
+        HFileLink.isHFileLink(newRegion.getStore(cf).getStorefiles().iterator().next().getPath()));
+    }
+
+    scan = new Scan();
+    scanValidate(scan, rowCount, table);
+
+    //Continuous Split, random split HFileLink, generate Reference files.
+    //After this, can not continuous split, because there are reference files.
+    findRegionToSplit(tableName, "row11");
+    Thread.sleep(3000);
+    assertNotEquals("Table is not split properly?", -1,
+      TESTING_UTIL.waitFor(3000, () -> cluster.getRegions(tableName).size() == 4));
+
+    scan = new Scan();
+    scanValidate(scan, rowCount, table);
+  }
+
+  private void findRegionToSplit(TableName tableName, String splitRowKey) throws Exception {
+    HRegion toSplit = null;
+    byte[] toSplitKey = Bytes.toBytes(splitRowKey);
+    for(HRegion rg : cluster.getRegions(tableName)) {
+      LOG.debug("startKey=" +
+        Bytes.toStringBinary(rg.getRegionInfo().getStartKey()) + ", getEndKey()=" +
+        Bytes.toStringBinary(rg.getRegionInfo().getEndKey()) + ", row=" + splitRowKey);
+      if((rg.getRegionInfo().getStartKey().length==0||
+        CellComparator.getInstance().compare(
+          PrivateCellUtil.createFirstOnRow(rg.getRegionInfo().getStartKey()),
+          PrivateCellUtil.createFirstOnRow(toSplitKey)) <= 0) &&(
+        rg.getRegionInfo().getEndKey().length==0||
+          CellComparator.getInstance().compare(
+            PrivateCellUtil.createFirstOnRow(rg.getRegionInfo().getEndKey()),
+            PrivateCellUtil.createFirstOnRow(toSplitKey)) >= 0)){
+        toSplit = rg;
+      }
+    }
+    assertNotNull(toSplit);
+    admin.splitRegionAsync(toSplit.getRegionInfo().getRegionName(), toSplitKey);
+  }
+
+  private static void scanValidate(Scan scan, int expectedRowCount, Table table) throws IOException{
+    ResultScanner scanner = table.getScanner(scan);
+    int rows = 0;
+    for (Result result : scanner) {
+      rows++;
+    }
+    scanner.close();
+    assertEquals(expectedRowCount, rows);
+  }
+
   public static class FailingSplitMasterObserver implements MasterCoprocessor, MasterObserver {
     volatile CountDownLatch latch;