You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/10/14 15:17:37 UTC
[hbase] 03/09: HBASE-26079 Use StoreFileTracker when splitting and
merging (#3617)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-26067
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 708b7c143df74d3ffe21623295ca1f27cbc9b8ed
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Wed Sep 8 10:31:49 2021 +0100
HBASE-26079 Use StoreFileTracker when splitting and merging (#3617)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../assignment/MergeTableRegionsProcedure.java | 23 +-
.../assignment/SplitTableRegionProcedure.java | 41 ++--
.../hbase/regionserver/HRegionFileSystem.java | 42 +++-
.../storefiletracker/DefaultStoreFileTracker.java | 4 +-
.../storefiletracker/StoreFileTracker.java | 1 -
.../storefiletracker/StoreFileTrackerFactory.java | 33 ++-
.../hbase/regionserver/TestDefaultStoreEngine.java | 1 +
.../regionserver/TestDirectStoreSplitsMerges.java | 32 ++-
.../hadoop/hbase/regionserver/TestHStoreFile.java | 19 +-
.../regionserver/TestMergesSplitsAddToTracker.java | 262 +++++++++++++++++++++
.../hbase/regionserver/TestStripeStoreEngine.java | 1 +
.../storefiletracker/TestStoreFileTracker.java} | 42 ++--
12 files changed, 436 insertions(+), 65 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index fb57cb9..4b25a28 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
@@ -53,6 +55,8 @@ import org.apache.hadoop.hbase.quotas.QuotaExceededException;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
@@ -584,40 +588,47 @@ public class MergeTableRegionsProcedure
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
final FileSystem fs = mfs.getFileSystem();
-
+ List<Path> mergedFiles = new ArrayList<>();
HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
env.getMasterConfiguration(), fs, tableDir, mergedRegion);
for (RegionInfo ri: this.regionsToMerge) {
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, tableDir, ri, false);
- mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion);
+ mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion));
}
assert mergeRegionFs != null;
- mergeRegionFs.commitMergedRegion();
+ mergeRegionFs.commitMergedRegion(mergedFiles, env);
// Prepare to create merged regions
env.getAssignmentManager().getRegionStates().
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
}
- private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
+ private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
.get(mergedRegion.getTable());
+ List<Path> mergedFiles = new ArrayList<>();
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
String family = hcd.getNameAsString();
- final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
+ Configuration trackerConfig =
+ StoreFileTrackerFactory.mergeConfigurations(env.getMasterConfiguration(), htd, hcd);
+ StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
+ family, regionFs);
+ final Collection<StoreFileInfo> storeFiles = tracker.load();
if (storeFiles != null && storeFiles.size() > 0) {
for (StoreFileInfo storeFileInfo : storeFiles) {
// Create reference file(s) to parent region file here in mergedDir.
// As this procedure is running on master, use CacheConfig.DISABLED means
// don't cache any block.
- mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
+ Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
+ mergedFiles.add(refFile);
}
}
}
+ return mergedFiles;
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index 0a15e36..4d53df4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -618,21 +620,20 @@ public class SplitTableRegionProcedure
final FileSystem fs = mfs.getFileSystem();
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
-
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
- Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
+ Pair<List<Path>, List<Path>> expectedReferences = splitStoreFiles(env, regionFs);
- assertReferenceFileCount(fs, expectedReferences.getFirst(),
+ assertReferenceFileCount(fs, expectedReferences.getFirst().size(),
regionFs.getSplitsDir(daughterOneRI));
- regionFs.commitDaughterRegion(daughterOneRI);
- assertReferenceFileCount(fs, expectedReferences.getFirst(),
+ regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
+ assertReferenceFileCount(fs, expectedReferences.getFirst().size(),
new Path(tabledir, daughterOneRI.getEncodedName()));
- assertReferenceFileCount(fs, expectedReferences.getSecond(),
+ assertReferenceFileCount(fs, expectedReferences.getSecond().size(),
regionFs.getSplitsDir(daughterTwoRI));
- regionFs.commitDaughterRegion(daughterTwoRI);
- assertReferenceFileCount(fs, expectedReferences.getSecond(),
+ regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
+ assertReferenceFileCount(fs, expectedReferences.getSecond().size(),
new Path(tabledir, daughterTwoRI.getEncodedName()));
}
@@ -649,7 +650,7 @@ public class SplitTableRegionProcedure
* Create Split directory
* @param env MasterProcedureEnv
*/
- private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
+ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv env,
final HRegionFileSystem regionFs) throws IOException {
final Configuration conf = env.getMasterConfiguration();
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
@@ -665,7 +666,11 @@ public class SplitTableRegionProcedure
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
String family = cfd.getNameAsString();
- Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
+ Configuration trackerConfig = StoreFileTrackerFactory.
+ mergeConfigurations(env.getMasterConfiguration(), htd, htd.getColumnFamily(cfd.getName()));
+ StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
+ family, regionFs);
+ Collection<StoreFileInfo> sfis = tracker.load();
if (sfis == null) {
continue;
}
@@ -691,7 +696,7 @@ public class SplitTableRegionProcedure
}
if (nbFiles == 0) {
// no file needs to be splitted.
- return new Pair<Integer, Integer>(0, 0);
+ return new Pair<>(Collections.emptyList(), Collections.emptyList());
}
// Max #threads is the smaller of the number of storefiles or the default max determined above.
int maxThreads = Math.min(
@@ -744,14 +749,18 @@ public class SplitTableRegionProcedure
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
- int daughterA = 0;
- int daughterB = 0;
+ List<Path> daughterA = new ArrayList<>();
+ List<Path> daughterB = new ArrayList<>();
// Look for any exception
for (Future<Pair<Path, Path>> future : futures) {
try {
Pair<Path, Path> p = future.get();
- daughterA += p.getFirst() != null ? 1 : 0;
- daughterB += p.getSecond() != null ? 1 : 0;
+ if(p.getFirst() != null){
+ daughterA.add(p.getFirst());
+ }
+ if(p.getSecond() != null){
+ daughterB.add(p.getSecond());
+ }
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
@@ -764,7 +773,7 @@ public class SplitTableRegionProcedure
getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA +
" storefiles, Daughter B: " + daughterB + " storefiles.");
}
- return new Pair<Integer, Integer>(daughterA, daughterB);
+ return new Pair<>(daughterA, daughterB);
}
private void assertReferenceFileCount(final FileSystem fs, final int expectedReferenceFileCount,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 6ab9b0f..3b053a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -23,7 +23,9 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
@@ -45,6 +47,9 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -592,19 +597,46 @@ public class HRegionFileSystem {
* @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
* @throws IOException
*/
- public Path commitDaughterRegion(final RegionInfo regionInfo)
- throws IOException {
+ public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegionFiles,
+ MasterProcedureEnv env) throws IOException {
Path regionDir = this.getSplitsDir(regionInfo);
if (fs.exists(regionDir)) {
// Write HRI to a file in case we need to recover hbase:meta
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
+ HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+ env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
+ insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
}
-
return regionDir;
}
+ private void insertRegionFilesIntoStoreTracker(List<Path> allFiles, MasterProcedureEnv env,
+ HRegionFileSystem regionFs) throws IOException {
+ TableDescriptor tblDesc = env.getMasterServices().getTableDescriptors().
+ get(regionInfo.getTable());
+ //we need to map trackers per store
+ Map<String, StoreFileTracker> trackerMap = new HashMap<>();
+ //we need to map store files per store
+ Map<String, List<StoreFileInfo>> fileInfoMap = new HashMap<>();
+ for(Path file : allFiles) {
+ String familyName = file.getParent().getName();
+ trackerMap.computeIfAbsent(familyName, t -> {
+ Configuration config = StoreFileTrackerFactory.mergeConfigurations(conf, tblDesc,
+ tblDesc.getColumnFamily(Bytes.toBytes(familyName)));
+ return StoreFileTrackerFactory.
+ create(config, true, familyName, regionFs);
+ });
+ fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>());
+ List<StoreFileInfo> infos = fileInfoMap.get(familyName);
+ infos.add(new StoreFileInfo(conf, fs, file, true));
+ }
+ for(Map.Entry<String, StoreFileTracker> entry : trackerMap.entrySet()) {
+ entry.getValue().add(fileInfoMap.get(entry.getKey()));
+ }
+ }
+
/**
* Creates region split daughter directories under the table dir. If the daughter regions already
* exist, for example, in the case of a recovery from a previous failed split procedure, this
@@ -755,13 +787,15 @@ public class HRegionFileSystem {
* Commit a merged region, making it ready for use.
* @throws IOException
*/
- public void commitMergedRegion() throws IOException {
+ public void commitMergedRegion(List<Path> allMergedFiles, MasterProcedureEnv env)
+ throws IOException {
Path regionDir = getMergesDir(regionInfoForFs);
if (regionDir != null && fs.exists(regionDir)) {
// Write HRI to a file in case we need to recover hbase:meta
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
+ insertRegionFilesIntoStoreTracker(allMergedFiles, env, this);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
index fa04481..22e0513 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;
@@ -32,8 +33,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class DefaultStoreFileTracker extends StoreFileTrackerBase {
- public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
- StoreContext ctx) {
+ public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
super(conf, isPrimaryReplica, ctx);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
index aadedc8..0a85abb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
@@ -48,7 +48,6 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public interface StoreFileTracker {
-
/**
* Load the store files list when opening a region.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
index 6cdfaf4..c446d5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
@@ -18,22 +18,51 @@
package org.apache.hadoop.hbase.regionserver.storefiletracker;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompoundConfiguration;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Factory method for creating store file tracker.
*/
@InterfaceAudience.Private
public final class StoreFileTrackerFactory {
-
public static final String TRACK_IMPL = "hbase.store.file-tracker.impl";
+ private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class);
public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
- StoreContext ctx) {
+ StoreContext ctx) {
Class<? extends StoreFileTracker> tracker =
conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
+ LOG.info("instantiating StoreFileTracker impl {}", tracker.getName());
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
}
+
+ public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family,
+ HRegionFileSystem regionFs) {
+ ColumnFamilyDescriptorBuilder fDescBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
+ StoreContext ctx = StoreContext.getBuilder().
+ withColumnFamilyDescriptor(fDescBuilder.build()).
+ withRegionFileSystem(regionFs).
+ build();
+ return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx);
+ }
+
+ public static Configuration mergeConfigurations(Configuration global,
+ TableDescriptor table, ColumnFamilyDescriptor family) {
+ return new CompoundConfiguration()
+ .add(global)
+ .addBytesMap(table.getValues())
+ .addStringMap(family.getConfiguration())
+ .addBytesMap(family.getValues());
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
index 3784876..523f277 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
@@ -67,6 +67,7 @@ public class TestDefaultStoreEngine {
DummyStoreFlusher.class.getName());
HRegion mockRegion = Mockito.mock(HRegion.class);
HStore mockStore = Mockito.mock(HStore.class);
+ mockStore.conf = conf;
Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
StoreEngine<?, ?, ?, ?> se =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreSplitsMerges.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreSplitsMerges.java
index bd24f1b..0eba8aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreSplitsMerges.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreSplitsMerges.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -139,7 +141,9 @@ public class TestDirectStoreSplitsMerges {
setRegionId(region.getRegionInfo().getRegionId() +
EnvironmentEdgeManager.currentTime()).build();
Path splitDir = regionFS.getSplitsDir(daughterA);
- Path result = regionFS.commitDaughterRegion(daughterA);
+ MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster().
+ getMasterProcedureExecutor().getEnvironment();
+ Path result = regionFS.commitDaughterRegion(daughterA, new ArrayList<>(), env);
assertEquals(splitDir, result);
}
@@ -162,14 +166,18 @@ public class TestDirectStoreSplitsMerges {
Path splitDirA = regionFS.getSplitsDir(daughterA);
Path splitDirB = regionFS.getSplitsDir(daughterB);
HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
- regionFS
+ List<Path> filesA = new ArrayList<>();
+ filesA.add(regionFS
.splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
- Bytes.toBytes("002"), false, region.getSplitPolicy());
- regionFS
+ Bytes.toBytes("002"), false, region.getSplitPolicy()));
+ List<Path> filesB = new ArrayList<>();
+ filesB.add(regionFS
.splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file,
- Bytes.toBytes("002"), true, region.getSplitPolicy());
- Path resultA = regionFS.commitDaughterRegion(daughterA);
- Path resultB = regionFS.commitDaughterRegion(daughterB);
+ Bytes.toBytes("002"), true, region.getSplitPolicy()));
+ MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster().
+ getMasterProcedureExecutor().getEnvironment();
+ Path resultA = regionFS.commitDaughterRegion(daughterA, filesA, env);
+ Path resultB = regionFS.commitDaughterRegion(daughterB, filesB, env);
assertEquals(splitDirA, resultA);
assertEquals(splitDirB, resultB);
}
@@ -203,8 +211,11 @@ public class TestDirectStoreSplitsMerges {
mergeFileFromRegion(mergeRegionFs, first, file);
//merge file from second region
file = (HStoreFile) second.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
- mergeFileFromRegion(mergeRegionFs, second, file);
- mergeRegionFs.commitMergedRegion();
+ List<Path> mergedFiles = new ArrayList<>();
+ mergedFiles.add(mergeFileFromRegion(mergeRegionFs, second, file));
+ MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster().
+ getMasterProcedureExecutor().getEnvironment();
+ mergeRegionFs.commitMergedRegion(mergedFiles, env);
}
private void waitForSplitProcComplete(int attempts, int waitTime) throws Exception {
@@ -223,11 +234,12 @@ public class TestDirectStoreSplitsMerges {
}
}
- private void mergeFileFromRegion(HRegionFileSystem regionFS, HRegion regionToMerge,
+ private Path mergeFileFromRegion(HRegionFileSystem regionFS, HRegion regionToMerge,
HStoreFile file) throws IOException {
Path mergedFile = regionFS.mergeStoreFile(regionToMerge.getRegionInfo(),
Bytes.toString(FAMILY_NAME), file);
validateResultingFile(regionToMerge.getRegionInfo().getEncodedName(), mergedFile);
+ return mergedFile;
}
private void validateResultingFile(String originalRegion, Path result){
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
index e6f0357..c7203a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -49,12 +50,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
@@ -69,6 +72,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
@@ -1055,7 +1060,19 @@ public class TestHStoreFile {
if (null == path) {
return null;
}
- Path regionDir = regionFs.commitDaughterRegion(hri);
+ List<Path> splitFiles = new ArrayList<>();
+ splitFiles.add(path);
+ MasterProcedureEnv mockEnv = mock(MasterProcedureEnv.class);
+ MasterServices mockServices = mock(MasterServices.class);
+ when(mockEnv.getMasterServices()).thenReturn(mockServices);
+ when(mockEnv.getMasterConfiguration()).thenReturn(new Configuration());
+ TableDescriptors mockTblDescs = mock(TableDescriptors.class);
+ when(mockServices.getTableDescriptors()).thenReturn(mockTblDescs);
+ TableDescriptor mockTblDesc = mock(TableDescriptor.class);
+ when(mockTblDescs.get(any())).thenReturn(mockTblDesc);
+ ColumnFamilyDescriptor mockCfDesc = mock(ColumnFamilyDescriptor.class);
+ when(mockTblDesc.getColumnFamily(any())).thenReturn(mockCfDesc);
+ Path regionDir = regionFs.commitDaughterRegion(hri, splitFiles, mockEnv);
return new Path(new Path(regionDir, family), path.getName());
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java
new file mode 100644
index 0000000..c6205cb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.
+ TRACK_IMPL;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestMergesSplitsAddToTracker {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMergesSplitsAddToTracker.class);
+
+ private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+ public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
+
+ @Rule
+ public TestName name = new TestName();
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ TEST_UTIL.getConfiguration().set(TRACK_IMPL, TestStoreFileTracker.class.getName());
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setup(){
+ TestStoreFileTracker.trackedFiles = new HashMap<>();
+ }
+
+ @Test
+ public void testCommitDaughterRegion() throws Exception {
+ TableName table = TableName.valueOf(name.getMethodName());
+ TEST_UTIL.createTable(table, FAMILY_NAME);
+ //first put some data in order to have a store file created
+ putThreeRowsAndFlush(table);
+ HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
+ HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
+ RegionInfo daughterA =
+ RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().getStartKey()).
+ setEndKey(Bytes.toBytes("002")).setSplit(false).
+ setRegionId(region.getRegionInfo().getRegionId() +
+ EnvironmentEdgeManager.currentTime()).
+ build();
+ RegionInfo daughterB = RegionInfoBuilder.newBuilder(table).setStartKey(Bytes.toBytes("002"))
+ .setEndKey(region.getRegionInfo().getEndKey()).setSplit(false)
+ .setRegionId(region.getRegionInfo().getRegionId()).build();
+ HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
+ List<Path> splitFilesA = new ArrayList<>();
+ splitFilesA.add(regionFS
+ .splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
+ Bytes.toBytes("002"), false, region.getSplitPolicy()));
+ List<Path> splitFilesB = new ArrayList<>();
+ splitFilesB.add(regionFS
+ .splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file,
+ Bytes.toBytes("002"), true, region.getSplitPolicy()));
+ MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster().
+ getMasterProcedureExecutor().getEnvironment();
+ Path resultA = regionFS.commitDaughterRegion(daughterA, splitFilesA, env);
+ Path resultB = regionFS.commitDaughterRegion(daughterB, splitFilesB, env);
+ FileSystem fs = regionFS.getFileSystem();
+ verifyFilesAreTracked(resultA, fs);
+ verifyFilesAreTracked(resultB, fs);
+ }
+
+ @Test
+ public void testCommitMergedRegion() throws Exception {
+ TableName table = TableName.valueOf(name.getMethodName());
+ TEST_UTIL.createTable(table, FAMILY_NAME);
+ //splitting the table first
+ TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
+ //Add data and flush to create files in the two different regions
+ putThreeRowsAndFlush(table);
+ List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
+ HRegion first = regions.get(0);
+ HRegion second = regions.get(1);
+ HRegionFileSystem regionFS = first.getRegionFileSystem();
+
+ RegionInfo mergeResult =
+ RegionInfoBuilder.newBuilder(table).setStartKey(first.getRegionInfo().getStartKey())
+ .setEndKey(second.getRegionInfo().getEndKey()).setSplit(false)
+ .setRegionId(first.getRegionInfo().getRegionId() +
+ EnvironmentEdgeManager.currentTime()).build();
+
+ HRegionFileSystem mergeFS = HRegionFileSystem.createRegionOnFileSystem(
+ TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(),
+ regionFS.getFileSystem(), regionFS.getTableDir(), mergeResult);
+
+ List<Path> mergedFiles = new ArrayList<>();
+ //merge file from first region
+ mergedFiles.add(mergeFileFromRegion(first, mergeFS));
+ //merge file from second region
+ mergedFiles.add(mergeFileFromRegion(second, mergeFS));
+ MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster().
+ getMasterProcedureExecutor().getEnvironment();
+ mergeFS.commitMergedRegion(mergedFiles, env);
+ //validate
+ FileSystem fs = first.getRegionFileSystem().getFileSystem();
+ Path finalMergeDir = new Path(first.getRegionFileSystem().getTableDir(),
+ mergeResult.getEncodedName());
+ verifyFilesAreTracked(finalMergeDir, fs);
+ }
+
+ @Test
+ public void testSplitLoadsFromTracker() throws Exception {
+ TableName table = TableName.valueOf(name.getMethodName());
+ TEST_UTIL.createTable(table, FAMILY_NAME);
+ //Add data and flush to create files in the two different regions
+ putThreeRowsAndFlush(table);
+ HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
+ Pair<StoreFileInfo, String> copyResult = copyFileInTheStoreDir(region);
+ StoreFileInfo fileInfo = copyResult.getFirst();
+ String copyName = copyResult.getSecond();
+ //Now splits the region
+ TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
+ List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
+ HRegion first = regions.get(0);
+ validateDaughterRegionsFiles(first, fileInfo.getActiveFileName(), copyName);
+ HRegion second = regions.get(1);
+ validateDaughterRegionsFiles(second, fileInfo.getActiveFileName(), copyName);
+ }
+
+ @Test
+ public void testMergeLoadsFromTracker() throws Exception {
+ TableName table = TableName.valueOf(name.getMethodName());
+ TEST_UTIL.createTable(table, new byte[][]{FAMILY_NAME},
+ new byte[][]{Bytes.toBytes("002")});
+ //Add data and flush to create files in the two different regions
+ putThreeRowsAndFlush(table);
+ List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
+ HRegion first = regions.get(0);
+ Pair<StoreFileInfo, String> copyResult = copyFileInTheStoreDir(first);
+ StoreFileInfo fileInfo = copyResult.getFirst();
+ String copyName = copyResult.getSecond();
+ //Now merges the first two regions
+ TEST_UTIL.getAdmin().mergeRegionsAsync(new byte[][]{
+ first.getRegionInfo().getEncodedNameAsBytes(),
+ regions.get(1).getRegionInfo().getEncodedNameAsBytes()
+ }, true).get(10, TimeUnit.SECONDS);
+ regions = TEST_UTIL.getHBaseCluster().getRegions(table);
+ HRegion merged = regions.get(0);
+ validateDaughterRegionsFiles(merged, fileInfo.getActiveFileName(), copyName);
+ }
+
+ private Pair<StoreFileInfo,String> copyFileInTheStoreDir(HRegion region) throws IOException {
+ Path storeDir = region.getRegionFileSystem().getStoreDir("info");
+ //gets the single file
+ StoreFileInfo fileInfo = region.getRegionFileSystem().getStoreFiles("info").get(0);
+ //make a copy of the valid file staight into the store dir, so that it's not tracked.
+ String copyName = UUID.randomUUID().toString().replaceAll("-", "");
+ Path copy = new Path(storeDir, copyName);
+ FileUtil.copy(region.getFilesystem(), fileInfo.getFileStatus(), region.getFilesystem(),
+ copy , false, false, TEST_UTIL.getConfiguration());
+ return new Pair<>(fileInfo, copyName);
+ }
+
+ private void validateDaughterRegionsFiles(HRegion region, String orignalFileName,
+ String untrackedFile) throws IOException {
+ //verify there's no link for the untracked, copied file in first region
+ List<StoreFileInfo> infos = region.getRegionFileSystem().getStoreFiles("info");
+ final MutableBoolean foundLink = new MutableBoolean(false);
+ infos.stream().forEach(i -> {
+ i.getActiveFileName().contains(orignalFileName);
+ if(i.getActiveFileName().contains(untrackedFile)){
+ fail();
+ }
+ if(i.getActiveFileName().contains(orignalFileName)){
+ foundLink.setTrue();
+ }
+ });
+ assertTrue(foundLink.booleanValue());
+ }
+
+ private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception {
+ String storeId = regionDir.getName() + "-info";
+ for(FileStatus f : fs.listStatus(new Path(regionDir, Bytes.toString(FAMILY_NAME)))){
+ assertTrue(TestStoreFileTracker.trackedFiles.get(storeId).stream().filter(s ->
+ s.getPath().equals(f.getPath())).findFirst().isPresent());
+ }
+ }
+
+ private Path mergeFileFromRegion(HRegion regionToMerge, HRegionFileSystem mergeFS)
+ throws IOException {
+ HStoreFile file = (HStoreFile) regionToMerge.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
+ return mergeFS.mergeStoreFile(regionToMerge.getRegionInfo(), Bytes.toString(FAMILY_NAME), file);
+ }
+
+ private void putThreeRowsAndFlush(TableName table) throws IOException {
+ Table tbl = TEST_UTIL.getConnection().getTable(table);
+ Put put = new Put(Bytes.toBytes("001"));
+ byte[] qualifier = Bytes.toBytes("1");
+ put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(1));
+ tbl.put(put);
+ put = new Put(Bytes.toBytes("002"));
+ put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
+ tbl.put(put);
+ put = new Put(Bytes.toBytes("003"));
+ put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
+ tbl.put(put);
+ TEST_UTIL.flush(table);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
index eb0b1c1..80012df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
@@ -120,6 +120,7 @@ public class TestStripeStoreEngine {
private static TestStoreEngine createEngine(Configuration conf) throws Exception {
HRegion region = mock(HRegion.class);
HStore store = mock(HStore.class);
+ store.conf = conf;
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
when(store.getHRegion()).thenReturn(region);
CellComparatorImpl kvComparator = mock(CellComparatorImpl.class);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
similarity index 61%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
copy to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
index fa04481..05ca1fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
@@ -18,43 +18,39 @@
package org.apache.hadoop.hbase.regionserver.storefiletracker;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * The default implementation for store file tracker, where we do not persist the store file list,
- * and use listing when loading store files.
- */
-@InterfaceAudience.Private
-class DefaultStoreFileTracker extends StoreFileTrackerBase {
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
- StoreContext ctx) {
- super(conf, isPrimaryReplica, ctx);
- }
+public class TestStoreFileTracker extends DefaultStoreFileTracker {
- @Override
- public List<StoreFileInfo> load() throws IOException {
- return ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
- }
+ private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileTracker.class);
+ public static Map<String, List<StoreFileInfo>> trackedFiles = new HashMap<>();
+ private String storeId;
- @Override
- public boolean requireWritingToTmpDirFirst() {
- return true;
+ public TestStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
+ super(conf, isPrimaryReplica, ctx);
+ this.storeId = ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString();
+ LOG.info("created storeId: {}", storeId);
+ trackedFiles.computeIfAbsent(storeId, v -> new ArrayList<>());
}
@Override
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
- // NOOP
+ LOG.info("adding to storeId: {}", storeId);
+ trackedFiles.get(storeId).addAll(newFiles);
}
@Override
- protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
- Collection<StoreFileInfo> newFiles) throws IOException {
- // NOOP
+ public List<StoreFileInfo> load() throws IOException {
+ return trackedFiles.get(storeId);
}
}