You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/01/01 15:56:00 UTC
[hbase] 01/15: HBASE-26064 Introduce a StoreFileTracker to abstract the store file tracking logic
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-26067-branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit e5750efeb52f078dcc6f86a1152f200f922a7723
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Jul 29 18:35:19 2021 +0800
HBASE-26064 Introduce a StoreFileTracker to abstract the store file tracking logic
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../hadoop/hbase/mob/DefaultMobStoreCompactor.java | 24 +-
.../hadoop/hbase/mob/DefaultMobStoreFlusher.java | 4 +-
.../regionserver/CreateStoreFileWriterParams.java | 134 ++++
.../hbase/regionserver/DateTieredStoreEngine.java | 5 +-
.../hbase/regionserver/DefaultStoreEngine.java | 5 +-
.../hbase/regionserver/DefaultStoreFlusher.java | 11 +-
.../hadoop/hbase/regionserver/HMobStore.java | 3 +-
.../hbase/regionserver/HRegionFileSystem.java | 10 +-
.../apache/hadoop/hbase/regionserver/HStore.java | 729 +++++----------------
.../hadoop/hbase/regionserver/StoreContext.java | 9 +
.../hadoop/hbase/regionserver/StoreEngine.java | 461 ++++++++++++-
.../hbase/regionserver/StoreFileManager.java | 9 +
.../hadoop/hbase/regionserver/StoreFlusher.java | 9 +-
.../hadoop/hbase/regionserver/StoreUtils.java | 37 +-
.../hbase/regionserver/StripeStoreEngine.java | 9 +-
.../hbase/regionserver/StripeStoreFlusher.java | 9 +-
.../compactions/AbstractMultiOutputCompactor.java | 7 +-
.../hbase/regionserver/compactions/Compactor.java | 36 +-
.../regionserver/compactions/DefaultCompactor.java | 16 +-
.../storefiletracker/DefaultStoreFileTracker.java | 61 ++
.../storefiletracker/StoreFileTracker.java | 75 +++
.../storefiletracker/StoreFileTrackerBase.java | 178 +++++
.../storefiletracker/StoreFileTrackerFactory.java | 35 +
.../util/compaction/MajorCompactionRequest.java | 1 -
.../org/apache/hadoop/hbase/TestIOFencing.java | 12 +-
.../regionserver/TestCacheOnWriteInSchema.java | 6 +-
.../hbase/regionserver/TestDefaultStoreEngine.java | 9 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 4 +-
.../hadoop/hbase/regionserver/TestHStore.java | 33 +-
.../TestRegionMergeTransactionOnCluster.java | 6 +-
.../regionserver/TestStoreFileRefresherChore.java | 3 +-
.../regionserver/TestStoreScannerClosure.java | 6 +-
.../hbase/regionserver/TestStripeStoreEngine.java | 12 +-
.../compactions/TestDateTieredCompactor.java | 16 +-
.../compactions/TestStripeCompactionPolicy.java | 12 +-
.../compactions/TestStripeCompactor.java | 16 +-
36 files changed, 1273 insertions(+), 739 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 75e2811..1b218b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -22,13 +22,12 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
@@ -80,17 +79,16 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
};
private final CellSinkFactory<StoreFileWriter> writerFactory =
- new CellSinkFactory<StoreFileWriter>() {
- @Override
- public StoreFileWriter createWriter(InternalScanner scanner,
- org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
- boolean shouldDropBehind, boolean major) throws IOException {
- // make this writer with tags always because of possible new cells with tags.
- return store.createWriterInTmp(fd.maxKeyCount,
- major ? majorCompactionCompression : minorCompactionCompression, true, true, true,
- shouldDropBehind);
- }
- };
+ new CellSinkFactory<StoreFileWriter>() {
+ @Override
+ public StoreFileWriter createWriter(InternalScanner scanner,
+ org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
+ boolean shouldDropBehind, boolean major) throws IOException {
+ // make this writer with tags always because of possible new cells with tags.
+ return store.getStoreEngine().createWriter(
+ createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true));
+ }
+ };
public DefaultMobStoreCompactor(Configuration conf, HStore store) {
super(conf, store);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index cc610e5..a52ce2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -23,7 +23,6 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -115,8 +114,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
- writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
- false, true, true, false);
+ writer = createWriter(snapshot, true);
IOException e = null;
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
new file mode 100644
index 0000000..10cd9f0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class CreateStoreFileWriterParams {
+
+ private long maxKeyCount;
+
+ private Compression.Algorithm compression;
+
+ private boolean isCompaction;
+
+ private boolean includeMVCCReadpoint;
+
+ private boolean includesTag;
+
+ private boolean shouldDropBehind;
+
+ private long totalCompactedFilesSize = -1;
+
+ private String fileStoragePolicy = HConstants.EMPTY_STRING;
+
+ private CreateStoreFileWriterParams() {
+ }
+
+ public long maxKeyCount() {
+ return maxKeyCount;
+ }
+
+ public CreateStoreFileWriterParams maxKeyCount(long maxKeyCount) {
+ this.maxKeyCount = maxKeyCount;
+ return this;
+ }
+
+ public Compression.Algorithm compression() {
+ return compression;
+ }
+
+ /**
+ * Set the compression algorithm to use
+ */
+ public CreateStoreFileWriterParams compression(Compression.Algorithm compression) {
+ this.compression = compression;
+ return this;
+ }
+
+ public boolean isCompaction() {
+ return isCompaction;
+ }
+
+ /**
+ * Whether we are creating a new file in a compaction
+ */
+ public CreateStoreFileWriterParams isCompaction(boolean isCompaction) {
+ this.isCompaction = isCompaction;
+ return this;
+ }
+
+ public boolean includeMVCCReadpoint() {
+ return includeMVCCReadpoint;
+ }
+
+ /**
+ * Whether to include MVCC or not
+ */
+ public CreateStoreFileWriterParams includeMVCCReadpoint(boolean includeMVCCReadpoint) {
+ this.includeMVCCReadpoint = includeMVCCReadpoint;
+ return this;
+ }
+
+ public boolean includesTag() {
+ return includesTag;
+ }
+
+ /**
+ * Whether to includesTag or not
+ */
+ public CreateStoreFileWriterParams includesTag(boolean includesTag) {
+ this.includesTag = includesTag;
+ return this;
+ }
+
+ public boolean shouldDropBehind() {
+ return shouldDropBehind;
+ }
+
+ public CreateStoreFileWriterParams shouldDropBehind(boolean shouldDropBehind) {
+ this.shouldDropBehind = shouldDropBehind;
+ return this;
+ }
+
+ public long totalCompactedFilesSize() {
+ return totalCompactedFilesSize;
+ }
+
+ public CreateStoreFileWriterParams totalCompactedFilesSize(long totalCompactedFilesSize) {
+ this.totalCompactedFilesSize = totalCompactedFilesSize;
+ return this;
+ }
+
+ public String fileStoragePolicy() {
+ return fileStoragePolicy;
+ }
+
+ public CreateStoreFileWriterParams fileStoragePolicy(String fileStoragePolicy) {
+ this.fileStoragePolicy = fileStoragePolicy;
+ return this;
+ }
+
+ public static CreateStoreFileWriterParams create() {
+ return new CreateStoreFileWriterParams();
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
index 1df953d..7422d91 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -19,18 +19,17 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* HBASE-15400 This store engine allows us to store data in date tiered layout with exponential
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index 58f8bbb..693b9c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
@@ -39,8 +38,8 @@ import org.apache.yetus.audience.InterfaceAudience;
* their derivatives.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class DefaultStoreEngine extends StoreEngine<
- DefaultStoreFlusher, RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
+public class DefaultStoreEngine extends StoreEngine<DefaultStoreFlusher,
+ RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY =
"hbase.hstore.defaultengine.storeflusher.class";
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index a7d7fb1..306760d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Default implementation of StoreFlusher.
@@ -60,9 +59,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
- writer = store.createWriterInTmp(cellsCount,
- store.getColumnFamilyDescriptor().getCompressionType(), false, true,
- snapshot.isTagsPresent(), false);
+ writer = createWriter(snapshot, false);
IOException e = null;
try {
performFlush(scanner, writer, throughputController);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 3b50109..ce2b38b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -27,7 +27,6 @@ import java.util.NavigableSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -158,7 +157,7 @@ public class HMobStore extends HStore {
protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
CellComparator cellComparator) throws IOException {
MobStoreEngine engine = new MobStoreEngine();
- engine.createComponents(conf, store, cellComparator);
+ engine.createComponentsOnce(conf, store, cellComparator);
return engine;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 667eabf..2f5f8d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -145,7 +145,7 @@ public class HRegionFileSystem {
// Temp Helpers
// ===========================================================================
/** @return {@link Path} to the region's temp directory, used for file creations */
- Path getTempDir() {
+ public Path getTempDir() {
return new Path(getRegionDir(), REGION_TEMP_DIR);
}
@@ -240,11 +240,7 @@ public class HRegionFileSystem {
* @param familyName Column Family Name
* @return a set of {@link StoreFileInfo} for the specified family.
*/
- public Collection<StoreFileInfo> getStoreFiles(final byte[] familyName) throws IOException {
- return getStoreFiles(Bytes.toString(familyName));
- }
-
- public Collection<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
+ public List<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
return getStoreFiles(familyName, true);
}
@@ -254,7 +250,7 @@ public class HRegionFileSystem {
* @param familyName Column Family Name
* @return a set of {@link StoreFileInfo} for the specified family.
*/
- public Collection<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
+ public List<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
throws IOException {
Path familyDir = getStoreDir(familyName);
FileStatus[] files = CommonFSUtils.listStatus(this.fs, familyDir);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 0213827..5309305 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
@@ -47,8 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -70,17 +69,12 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration;
import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -110,7 +104,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
@@ -166,16 +159,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private boolean cacheOnWriteLogged;
/**
- * RWLock for store operations.
- * Locked in shared mode when the list of component stores is looked at:
- * - all reads/writes to table data
- * - checking for split
- * Locked in exclusive mode when the list of component stores is modified:
- * - closing
- * - completing a compaction
- */
- final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- /**
* Lock specific to archiving compacted store files. This avoids races around
* the combination of retrieving the list of compacted files and moving them to
* the archive directory. Since this is usually a background process (other than
@@ -283,14 +266,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
- List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
- // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read
- // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and
- // update the storeSize in the refreshStoreSizeAndTotalBytes() finally (just like compaction) , so
- // no need calculate the storeSize twice.
- this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true));
- this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles));
- this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles);
+ storeEngine.initialize(warmup);
+ refreshStoreSizeAndTotalBytes();
flushRetriesNumber = conf.getInt(
"hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
@@ -510,105 +487,18 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this.dataBlockEncoder = blockEncoder;
}
- /**
- * Creates an unsorted list of StoreFile loaded in parallel
- * from the given directory.
- */
- private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException {
- Collection<StoreFileInfo> files = getRegionFileSystem().getStoreFiles(getColumnFamilyName());
- return openStoreFiles(files, warmup);
- }
-
- private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
- throws IOException {
- if (CollectionUtils.isEmpty(files)) {
- return Collections.emptyList();
- }
- // initialize the thread pool for opening store files in parallel..
- ThreadPoolExecutor storeFileOpenerThreadPool =
- this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpener-" +
- this.getColumnFamilyName());
- CompletionService<HStoreFile> completionService =
- new ExecutorCompletionService<>(storeFileOpenerThreadPool);
-
- int totalValidStoreFile = 0;
- for (StoreFileInfo storeFileInfo : files) {
- // The StoreFileInfo will carry store configuration down to HFile, we need to set it to
- // our store's CompoundConfiguration here.
- storeFileInfo.setConf(conf);
- // open each store file in parallel
- completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo));
- totalValidStoreFile++;
- }
-
- Set<String> compactedStoreFiles = new HashSet<>();
- ArrayList<HStoreFile> results = new ArrayList<>(files.size());
- IOException ioe = null;
- try {
- for (int i = 0; i < totalValidStoreFile; i++) {
- try {
- HStoreFile storeFile = completionService.take().get();
- if (storeFile != null) {
- LOG.debug("loaded {}", storeFile);
- results.add(storeFile);
- compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
- }
- } catch (InterruptedException e) {
- if (ioe == null) {
- ioe = new InterruptedIOException(e.getMessage());
- }
- } catch (ExecutionException e) {
- if (ioe == null) {
- ioe = new IOException(e.getCause());
- }
- }
- }
- } finally {
- storeFileOpenerThreadPool.shutdownNow();
- }
- if (ioe != null) {
- // close StoreFile readers
- boolean evictOnClose =
- getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
- for (HStoreFile file : results) {
- try {
- if (file != null) {
- file.closeStoreFile(evictOnClose);
- }
- } catch (IOException e) {
- LOG.warn("Could not close store file {}", file, e);
- }
- }
- throw ioe;
- }
-
- // Should not archive the compacted store files when region warmup. See HBASE-22163.
- if (!warmup) {
- // Remove the compacted files from result
- List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
- for (HStoreFile storeFile : results) {
- if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
- LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
- storeFile.getReader().close(storeFile.getCacheConf() != null ?
- storeFile.getCacheConf().shouldEvictOnClose() : true);
- filesToRemove.add(storeFile);
- }
- }
- results.removeAll(filesToRemove);
- if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) {
- LOG.debug("Moving the files {} to archive", filesToRemove);
- getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
- filesToRemove);
- }
- }
-
- return results;
+ private void postRefreshStoreFiles() throws IOException {
+ // Advance the memstore read point to be at least the new store files seqIds so that
+ // readers might pick it up. This assumes that the store is not getting any writes (otherwise
+ // in-flight transactions might be made visible)
+ getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo);
+ refreshStoreSizeAndTotalBytes();
}
@Override
public void refreshStoreFiles() throws IOException {
- Collection<StoreFileInfo> newFiles = getRegionFileSystem().getStoreFiles(getColumnFamilyName());
- refreshStoreFilesInternal(newFiles);
+ storeEngine.refreshStoreFiles();
+ postRefreshStoreFiles();
}
/**
@@ -616,89 +506,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
* region replicas to keep up to date with the primary region files.
*/
public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
- List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
- for (String file : newFiles) {
- storeFiles.add(getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file));
- }
- refreshStoreFilesInternal(storeFiles);
- }
-
- /**
- * Checks the underlying store files, and opens the files that have not
- * been opened, and removes the store file readers for store files no longer
- * available. Mainly used by secondary region replicas to keep up to date with
- * the primary region files.
- */
- private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
- StoreFileManager sfm = storeEngine.getStoreFileManager();
- Collection<HStoreFile> currentFiles = sfm.getStorefiles();
- Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles();
- if (currentFiles == null) {
- currentFiles = Collections.emptySet();
- }
- if (newFiles == null) {
- newFiles = Collections.emptySet();
- }
- if (compactedFiles == null) {
- compactedFiles = Collections.emptySet();
- }
-
- HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
- for (HStoreFile sf : currentFiles) {
- currentFilesSet.put(sf.getFileInfo(), sf);
- }
- HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
- for (HStoreFile sf : compactedFiles) {
- compactedFilesSet.put(sf.getFileInfo(), sf);
- }
-
- Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
- // Exclude the files that have already been compacted
- newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
- Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
- Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
-
- if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
- return;
- }
-
- LOG.info("Refreshing store files for " + this + " files to add: "
- + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
-
- Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
- for (StoreFileInfo sfi : toBeRemovedFiles) {
- toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
- }
-
- // try to open the files
- List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
-
- // propogate the file changes to the underlying store file manager
- replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
-
- // Advance the memstore read point to be at least the new store files seqIds so that
- // readers might pick it up. This assumes that the store is not getting any writes (otherwise
- // in-flight transactions might be made visible)
- if (!toBeAddedFiles.isEmpty()) {
- // we must have the max sequence id here as we do have several store files
- region.getMVCC().advanceTo(this.getMaxSequenceId().getAsLong());
- }
-
- refreshStoreSizeAndTotalBytes();
- }
-
- protected HStoreFile createStoreFileAndReader(final Path p) throws IOException {
- StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(),
- p, isPrimaryReplicaStore());
- return createStoreFileAndReader(info);
- }
-
- private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
- info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
- HStoreFile storeFile = new HStoreFile(info, getColumnFamilyDescriptor().getBloomFilterType(),
- getCacheConfig());
- storeFile.initReader();
- return storeFile;
+ storeEngine.refreshStoreFiles(newFiles);
+ postRefreshStoreFiles();
}
/**
@@ -721,7 +530,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
* Adds a value to the memstore
*/
public void add(final Cell cell, MemStoreSizing memstoreSizing) {
- lock.readLock().lock();
+ storeEngine.readLock();
try {
if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
@@ -729,7 +538,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
this.memstore.add(cell, memstoreSizing);
} finally {
- lock.readLock().unlock();
+ storeEngine.readUnlock();
currentParallelPutCount.decrementAndGet();
}
}
@@ -738,7 +547,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
* Adds the specified value to the memstore
*/
public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
- lock.readLock().lock();
+ storeEngine.readLock();
try {
if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
@@ -746,7 +555,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
memstore.add(cells, memstoreSizing);
} finally {
- lock.readLock().unlock();
+ storeEngine.readUnlock();
currentParallelPutCount.decrementAndGet();
}
}
@@ -869,17 +678,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
LOG.info("Loaded HFile " + srcPath + " into " + this + " as "
+ dstPath + " - updating store file list.");
- HStoreFile sf = createStoreFileAndReader(dstPath);
+ HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
bulkLoadHFile(sf);
- LOG.info("Successfully loaded {} into {} (new location: {})",
- srcPath, this, dstPath);
+ LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath);
return dstPath;
}
public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
- HStoreFile sf = createStoreFileAndReader(fileInfo);
+ HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo);
bulkLoadHFile(sf);
}
@@ -887,28 +695,75 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
StoreFileReader r = sf.getReader();
this.storeSize.addAndGet(r.length());
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
-
- // Append the new storefile into the list
- this.lock.writeLock().lock();
- try {
- this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
- } finally {
- // We need the lock, as long as we are updating the storeFiles
- // or changing the memstore. Let us release it before calling
- // notifyChangeReadersObservers. See HBASE-4485 for a possible
- // deadlock scenario that could have happened if continue to hold
- // the lock.
- this.lock.writeLock().unlock();
- }
+ storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
+ });
LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this);
if (LOG.isTraceEnabled()) {
- String traceMessage = "BULK LOAD time,size,store size,store files ["
- + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
- + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
+ String traceMessage = "BULK LOAD time,size,store size,store files [" +
+ EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + "," +
+ storeEngine.getStoreFileManager().getStorefileCount() + "]";
LOG.trace(traceMessage);
}
}
+ private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException {
+ // Clear so metrics doesn't find them.
+ ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
+ Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles();
+ // clear the compacted files
+ if (CollectionUtils.isNotEmpty(compactedfiles)) {
+ removeCompactedfiles(compactedfiles,
+ getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true);
+ }
+ if (!result.isEmpty()) {
+ // initialize the thread pool for closing store files in parallel.
+ ThreadPoolExecutor storeFileCloserThreadPool =
+ this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" +
+ this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());
+
+ // close each store file in parallel
+ CompletionService<Void> completionService =
+ new ExecutorCompletionService<>(storeFileCloserThreadPool);
+ for (HStoreFile f : result) {
+ completionService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ boolean evictOnClose =
+ getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true;
+ f.closeStoreFile(evictOnClose);
+ return null;
+ }
+ });
+ }
+
+ IOException ioe = null;
+ try {
+ for (int i = 0; i < result.size(); i++) {
+ try {
+ Future<Void> future = completionService.take();
+ future.get();
+ } catch (InterruptedException e) {
+ if (ioe == null) {
+ ioe = new InterruptedIOException();
+ ioe.initCause(e);
+ }
+ } catch (ExecutionException e) {
+ if (ioe == null) {
+ ioe = new IOException(e.getCause());
+ }
+ }
+ }
+ } finally {
+ storeFileCloserThreadPool.shutdownNow();
+ }
+ if (ioe != null) {
+ throw ioe;
+ }
+ }
+ LOG.trace("Closed {}", this);
+ return result;
+ }
+
/**
* Close all the readers We don't need to worry about subsequent requests because the Region holds
* a write lock that will prevent any more reads or writes.
@@ -916,67 +771,18 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
* @throws IOException on failure
*/
public ImmutableCollection<HStoreFile> close() throws IOException {
+ // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report
+ // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally...
+ // Change later if findbugs becomes smarter in the future.
this.archiveLock.lock();
- this.lock.writeLock().lock();
try {
- // Clear so metrics doesn't find them.
- ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
- Collection<HStoreFile> compactedfiles =
- storeEngine.getStoreFileManager().clearCompactedFiles();
- // clear the compacted files
- if (CollectionUtils.isNotEmpty(compactedfiles)) {
- removeCompactedfiles(compactedfiles, getCacheConfig() != null ?
- getCacheConfig().shouldEvictOnClose() : true);
- }
- if (!result.isEmpty()) {
- // initialize the thread pool for closing store files in parallel.
- ThreadPoolExecutor storeFileCloserThreadPool = this.region
- .getStoreFileOpenAndCloseThreadPool("StoreFileCloser-"
- + this.getColumnFamilyName());
-
- // close each store file in parallel
- CompletionService<Void> completionService =
- new ExecutorCompletionService<>(storeFileCloserThreadPool);
- for (HStoreFile f : result) {
- completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws IOException {
- boolean evictOnClose =
- getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
- f.closeStoreFile(evictOnClose);
- return null;
- }
- });
- }
-
- IOException ioe = null;
- try {
- for (int i = 0; i < result.size(); i++) {
- try {
- Future<Void> future = completionService.take();
- future.get();
- } catch (InterruptedException e) {
- if (ioe == null) {
- ioe = new InterruptedIOException();
- ioe.initCause(e);
- }
- } catch (ExecutionException e) {
- if (ioe == null) {
- ioe = new IOException(e.getCause());
- }
- }
- }
- } finally {
- storeFileCloserThreadPool.shutdownNow();
- }
- if (ioe != null) {
- throw ioe;
- }
+ this.storeEngine.writeLock();
+ try {
+ return closeWithoutLock();
+ } finally {
+ this.storeEngine.writeUnlock();
}
- LOG.trace("Closed {}", this);
- return result;
} finally {
- this.lock.writeLock().unlock();
this.archiveLock.unlock();
}
}
@@ -1006,7 +812,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
try {
for (Path pathName : pathNames) {
lastPathName = pathName;
- validateStoreFile(pathName);
+ storeEngine.validateStoreFile(pathName);
}
return pathNames;
} catch (Exception e) {
@@ -1052,204 +858,37 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
- HStoreFile sf = createStoreFileAndReader(dstPath);
+ HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
StoreFileReader r = sf.getReader();
this.storeSize.addAndGet(r.length());
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
- this.lock.writeLock().lock();
- try {
- this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
- } finally {
- this.lock.writeLock().unlock();
- }
+ storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
+ });
LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf,
r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1));
return sf;
}
- /**
- * Commit the given {@code files}.
- * <p/>
- * We will move the file into data directory, and open it.
- * @param files the files want to commit
- * @param validate whether to validate the store files
- * @return the committed store files
- */
- private List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
- List<HStoreFile> committedFiles = new ArrayList<>(files.size());
- HRegionFileSystem hfs = getRegionFileSystem();
- String familyName = getColumnFamilyName();
- for (Path file : files) {
- try {
- if (validate) {
- validateStoreFile(file);
- }
- Path committedPath = hfs.commitStoreFile(familyName, file);
- HStoreFile sf = createStoreFileAndReader(committedPath);
- committedFiles.add(sf);
- } catch (IOException e) {
- LOG.error("Failed to commit store file {}", file, e);
- // Try to delete the files we have committed before.
- // It is OK to fail when deleting as leaving the file there does not cause any data
- // corruption problem. It just introduces some duplicated data which may impact read
- // performance a little when reading before compaction.
- for (HStoreFile sf : committedFiles) {
- Path pathToDelete = sf.getPath();
- try {
- sf.deleteStoreFile();
- } catch (IOException deleteEx) {
- LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
- deleteEx);
- }
- }
- throw new IOException("Failed to commit the flush", e);
- }
- }
- return committedFiles;
- }
-
- public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
- boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
- boolean shouldDropBehind) throws IOException {
- return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
- includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING);
- }
-
- /**
- * @param compression Compression algorithm to use
- * @param isCompaction whether we are creating a new file in a compaction
- * @param includeMVCCReadpoint - whether to include MVCC or not
- * @param includesTag - includesTag or not
- * @return Writer for a new StoreFile in the tmp dir.
- */
- // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of
- // compaction
- public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
- boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
- boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy)
- throws IOException {
- // creating new cache config for each new writer
- final CacheConfig cacheConf = getCacheConfig();
- final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
- if (isCompaction) {
- // Don't cache data on write on compactions, unless specifically configured to do so
- // Cache only when total file size remains lower than configured threshold
- final boolean cacheCompactedBlocksOnWrite =
- getCacheConfig().shouldCacheCompactedBlocksOnWrite();
- // if data blocks are to be cached on write
- // during compaction, we should forcefully
- // cache index and bloom blocks as well
- if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf
- .getCacheCompactedBlocksOnWriteThreshold()) {
- writerCacheConf.enableCacheOnWrite();
- if (!cacheOnWriteLogged) {
- LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
- "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
- cacheOnWriteLogged = true;
- }
- } else {
- writerCacheConf.setCacheDataOnWrite(false);
- if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
- // checking condition once again for logging
- LOG.debug(
- "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
- + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
- this, totalCompactedFilesSize,
- cacheConf.getCacheCompactedBlocksOnWriteThreshold());
- }
- }
- } else {
- final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
- if (shouldCacheDataOnWrite) {
- writerCacheConf.enableCacheOnWrite();
- if (!cacheOnWriteLogged) {
- LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " +
- "Index blocks and Bloom filter blocks", this);
- cacheOnWriteLogged = true;
- }
- }
- }
- Encryption.Context encryptionContext = storeContext.getEncryptionContext();
- HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
- encryptionContext);
- Path familyTempDir = new Path(getRegionFileSystem().getTempDir(), getColumnFamilyName());
- StoreFileWriter.Builder builder =
- new StoreFileWriter.Builder(conf, writerCacheConf, getFileSystem())
- .withOutputDir(familyTempDir)
- .withBloomType(storeContext.getBloomFilterType())
- .withMaxKeyCount(maxKeyCount)
- .withFavoredNodes(storeContext.getFavoredNodes())
- .withFileContext(hFileContext)
- .withShouldDropCacheBehind(shouldDropBehind)
- .withCompactedFilesSupplier(storeContext.getCompactedFilesSupplier())
- .withFileStoragePolicy(fileStoragePolicy);
- return builder.build();
- }
-
- private HFileContext createFileContext(Compression.Algorithm compression,
- boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
- if (compression == null) {
- compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
- }
- ColumnFamilyDescriptor family = getColumnFamilyDescriptor();
- HFileContext hFileContext = new HFileContextBuilder()
- .withIncludesMvcc(includeMVCCReadpoint)
- .withIncludesTags(includesTag)
- .withCompression(compression)
- .withCompressTags(family.isCompressTags())
- .withChecksumType(StoreUtils.getChecksumType(conf))
- .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
- .withBlockSize(family.getBlocksize())
- .withHBaseCheckSum(true)
- .withDataBlockEncoding(family.getDataBlockEncoding())
- .withEncryptionContext(encryptionContext)
- .withCreateTime(EnvironmentEdgeManager.currentTime())
- .withColumnFamily(getColumnFamilyDescriptor().getName())
- .withTableName(getTableName().getName())
- .withCellComparator(getComparator())
- .build();
- return hFileContext;
- }
-
private long getTotalSize(Collection<HStoreFile> sfs) {
return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
}
- /**
- * Change storeFiles adding into place the Reader produced by this new flush.
- * @param sfs Store files
- * @return Whether compaction is required.
- */
- private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException {
- this.lock.writeLock().lock();
- try {
- this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
- /**
- * NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
- * close {@link DefaultMemStore#snapshot}, which may be used by
- * {@link DefaultMemStore#getScanners}.
- */
- if (snapshotId > 0) {
- this.memstore.clearSnapshot(snapshotId);
- }
- } finally {
- // We need the lock, as long as we are updating the storeFiles
- // or changing the memstore. Let us release it before calling
- // notifyChangeReadersObservers. See HBASE-4485 for a possible
- // deadlock scenario that could have happened if continue to hold
- // the lock.
- this.lock.writeLock().unlock();
- }
-
+ private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws IOException {
+ // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
+ // close {@link DefaultMemStore#snapshot}, which may be used by
+ // {@link DefaultMemStore#getScanners}.
+ storeEngine.addStoreFiles(sfs,
+ snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> {
+ });
// notify to be called here - only in case of flushes
notifyChangedReadersObservers(sfs);
if (LOG.isTraceEnabled()) {
long totalSize = getTotalSize(sfs);
- String traceMessage = "FLUSH time,count,size,store size,store files ["
- + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
- + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
+ String traceMessage = "FLUSH time,count,size,store size,store files [" +
+ EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + "," +
+ storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
LOG.trace(traceMessage);
}
return needsCompaction();
@@ -1261,11 +900,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException {
for (ChangedReadersObserver o : this.changedReaderObservers) {
List<KeyValueScanner> memStoreScanners;
- this.lock.readLock().lock();
+ this.storeEngine.readLock();
try {
memStoreScanners = this.memstore.getScanners(o.getReadPoint());
} finally {
- this.lock.readLock().unlock();
+ this.storeEngine.readUnlock();
}
o.updateReaders(sfs, memStoreScanners);
}
@@ -1307,13 +946,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
byte[] stopRow, boolean includeStopRow, long readPt) throws IOException {
Collection<HStoreFile> storeFilesToScan;
List<KeyValueScanner> memStoreScanners;
- this.lock.readLock().lock();
+ this.storeEngine.readLock();
try {
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow);
memStoreScanners = this.memstore.getScanners(readPt);
} finally {
- this.lock.readLock().unlock();
+ this.storeEngine.readUnlock();
}
try {
@@ -1390,11 +1029,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
boolean includeMemstoreScanner) throws IOException {
List<KeyValueScanner> memStoreScanners = null;
if (includeMemstoreScanner) {
- this.lock.readLock().lock();
+ this.storeEngine.readLock();
try {
memStoreScanners = this.memstore.getScanners(readPt);
} finally {
- this.lock.readLock().unlock();
+ this.storeEngine.readUnlock();
}
}
try {
@@ -1510,14 +1149,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
List<Path> newFiles) throws IOException {
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
- List<HStoreFile> sfs = commitStoreFiles(newFiles, true);
+ List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true);
if (this.getCoprocessorHost() != null) {
for (HStoreFile sf : sfs) {
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
}
}
- writeCompactionWalRecord(filesToCompact, sfs);
- replaceStoreFiles(filesToCompact, sfs);
+ replaceStoreFiles(filesToCompact, sfs, true);
if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
@@ -1581,25 +1219,24 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
}
- void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result)
- throws IOException {
- this.lock.writeLock().lock();
- try {
- this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
- synchronized (filesCompacting) {
- filesCompacting.removeAll(compactedFiles);
- }
-
- // These may be null when the RS is shutting down. The space quota Chores will fix the Region
- // sizes later so it's not super-critical if we miss these.
- RegionServerServices rsServices = region.getRegionServerServices();
- if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
- updateSpaceQuotaAfterFileReplacement(
- rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
- compactedFiles, result);
- }
- } finally {
- this.lock.writeLock().unlock();
+ @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
+ allowedOnPath = ".*/(HStore|TestHStore).java")
+ void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
+ boolean writeCompactionMarker) throws IOException {
+ storeEngine.replaceStoreFiles(compactedFiles, result);
+ if (writeCompactionMarker) {
+ writeCompactionWalRecord(compactedFiles, result);
+ }
+ synchronized (filesCompacting) {
+ filesCompacting.removeAll(compactedFiles);
+ }
+ // These may be null when the RS is shutting down. The space quota Chores will fix the Region
+ // sizes later so it's not super-critical if we miss these.
+ RegionServerServices rsServices = region.getRegionServerServices();
+ if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
+ updateSpaceQuotaAfterFileReplacement(
+ rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
+ compactedFiles, result);
}
}
@@ -1722,7 +1359,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
for (String compactionOutput : compactionOutputs) {
StoreFileInfo storeFileInfo =
getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput);
- HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
+ HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
outputStoreFiles.add(storeFile);
}
}
@@ -1730,7 +1367,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
LOG.info("Replaying compaction marker, replacing input files: " +
inputStoreFiles + " with output files : " + outputStoreFiles);
- this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
+ this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false);
this.refreshStoreSizeAndTotalBytes();
}
}
@@ -1739,14 +1376,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
public boolean hasReferences() {
// Grab the read lock here, because we need to ensure that: only when the atomic
// replaceStoreFiles(..) finished, we can get all the complete store file list.
- this.lock.readLock().lock();
+ this.storeEngine.readLock();
try {
// Merge the current store files with compacted files here due to HBASE-20940.
Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles());
allStoreFiles.addAll(getCompactedFiles());
return StoreUtils.hasReferences(allStoreFiles);
} finally {
- this.lock.readLock().unlock();
+ this.storeEngine.readUnlock();
}
}
@@ -1786,7 +1423,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
final CompactionContext compaction = storeEngine.createCompaction();
CompactionRequestImpl request = null;
- this.lock.readLock().lock();
+ this.storeEngine.readLock();
try {
synchronized (filesCompacting) {
// First, see if coprocessor would want to override selection.
@@ -1859,7 +1496,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
request.setTracker(tracker);
}
} finally {
- this.lock.readLock().unlock();
+ this.storeEngine.readUnlock();
}
if (LOG.isDebugEnabled()) {
@@ -1892,7 +1529,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this, getColumnFamilyDescriptor().getMinVersions());
return;
}
- this.lock.readLock().lock();
+ this.storeEngine.readLock();
Collection<HStoreFile> delSfs = null;
try {
synchronized (filesCompacting) {
@@ -1904,7 +1541,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
}
} finally {
- this.lock.readLock().unlock();
+ this.storeEngine.readUnlock();
}
if (CollectionUtils.isEmpty(delSfs)) {
@@ -1912,8 +1549,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
- writeCompactionWalRecord(delSfs, newFiles);
- replaceStoreFiles(delSfs, newFiles);
+ replaceStoreFiles(delSfs, newFiles, true);
refreshStoreSizeAndTotalBytes();
LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
+ this + "; total size is "
@@ -1936,25 +1572,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
/**
- * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
- * operation.
- * @param path the path to the store file
- */
- private void validateStoreFile(Path path) throws IOException {
- HStoreFile storeFile = null;
- try {
- storeFile = createStoreFileAndReader(path);
- } catch (IOException e) {
- LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
- throw e;
- } finally {
- if (storeFile != null) {
- storeFile.closeStoreFile(false);
- }
- }
- }
-
- /**
* Update counts.
*/
protected void refreshStoreSizeAndTotalBytes()
@@ -1999,7 +1616,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
* Determines if Store should be split.
*/
public Optional<byte[]> getSplitPoint() {
- this.lock.readLock().lock();
+ this.storeEngine.readLock();
try {
// Should already be enforced by the split policy!
assert !this.getRegionInfo().isMetaRegion();
@@ -2012,7 +1629,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
} catch(IOException e) {
LOG.warn("Failed getting store size for {}", this, e);
} finally {
- this.lock.readLock().unlock();
+ this.storeEngine.readUnlock();
}
return Optional.empty();
}
@@ -2045,7 +1662,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
*/
public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
throws IOException {
- lock.readLock().lock();
+ storeEngine.readLock();
try {
ScanInfo scanInfo;
if (this.getCoprocessorHost() != null) {
@@ -2055,7 +1672,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
return createScanner(scan, scanInfo, targetCols, readPt);
} finally {
- lock.readLock().unlock();
+ storeEngine.readUnlock();
}
}
@@ -2085,7 +1702,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
boolean includeMemstoreScanner) throws IOException {
- this.lock.readLock().lock();
+ this.storeEngine.readLock();
try {
Map<String, HStoreFile> name2File =
new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
@@ -2110,7 +1727,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
includeStartRow, stopRow, includeStopRow, readPt, false);
} finally {
- this.lock.readLock().unlock();
+ this.storeEngine.readUnlock();
}
}
@@ -2176,41 +1793,20 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
@Override
public long getStorefilesSize() {
// Include all StoreFiles
- return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), sf -> true);
+ return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
+ sf -> true);
}
@Override
public long getHFilesSize() {
// Include only StoreFiles which are HFiles
- return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
+ return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
HStoreFile::isHFile);
}
- private long getTotalUncompressedBytes(List<HStoreFile> files) {
- return files.stream()
- .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes))
- .sum();
- }
-
- private long getStorefilesSize(Collection<HStoreFile> files, Predicate<HStoreFile> predicate) {
- return files.stream().filter(predicate)
- .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum();
- }
-
- private long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) {
- if (file == null) {
- return 0L;
- }
- StoreFileReader reader = file.getReader();
- if (reader == null) {
- return 0L;
- }
- return f.applyAsLong(reader);
- }
-
private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) {
return this.storeEngine.getStoreFileManager().getStorefiles().stream()
- .mapToLong(file -> getStorefileFieldSize(file, f)).sum();
+ .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum();
}
@Override
@@ -2281,11 +1877,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
*/
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing)
throws IOException {
- this.lock.readLock().lock();
+ this.storeEngine.readLock();
try {
this.memstore.upsert(cells, readpoint, memstoreSizing);
} finally {
- this.lock.readLock().unlock();
+ this.storeEngine.readUnlock();
}
}
@@ -2338,7 +1934,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
return false;
}
status.setStatus("Flushing " + this + ": reopening flushed file");
- List<HStoreFile> storeFiles = commitStoreFiles(tempFiles, false);
+ List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
for (HStoreFile sf : storeFiles) {
StoreFileReader r = sf.getReader();
if (LOG.isInfoEnabled()) {
@@ -2361,7 +1957,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
}
// Add new file to store files. Clear snapshot too while we have the Store write lock.
- return updateStorefiles(storeFiles, snapshot.getId());
+ return completeFlush(storeFiles, snapshot.getId());
}
@Override
@@ -2389,7 +1985,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// open the file as a store file (hfile link, etc)
StoreFileInfo storeFileInfo =
getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file);
- HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
+ HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
storeFiles.add(storeFile);
HStore.this.storeSize.addAndGet(storeFile.getReader().length());
HStore.this.totalUncompressedBytes
@@ -2405,7 +2001,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
if (dropMemstoreSnapshot && snapshot != null) {
snapshotId = snapshot.getId();
}
- HStore.this.updateStorefiles(storeFiles, snapshotId);
+ HStore.this.completeFlush(storeFiles, snapshotId);
}
/**
@@ -2414,7 +2010,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
@Override
public void abort() throws IOException {
if (snapshot != null) {
- HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
+ HStore.this.completeFlush(Collections.emptyList(), snapshot.getId());
}
}
}
@@ -2577,7 +2173,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// ensure other threads do not attempt to archive the same files on close()
archiveLock.lock();
try {
- lock.readLock().lock();
+ storeEngine.readLock();
Collection<HStoreFile> copyCompactedfiles = null;
try {
Collection<HStoreFile> compactedfiles =
@@ -2589,7 +2185,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
LOG.trace("No compacted files to archive");
}
} finally {
- lock.readLock().unlock();
+ storeEngine.readUnlock();
}
if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
removeCompactedfiles(copyCompactedfiles, true);
@@ -2724,12 +2320,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {
LOG.trace("Clearing the compacted file {} from this store", filesToRemove);
- try {
- lock.writeLock().lock();
- this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
- } finally {
- lock.writeLock().unlock();
- }
+ storeEngine.removeCompactedFiles(filesToRemove);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
index 2623350..2a9f968 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
@@ -23,6 +23,7 @@ import java.util.function.Supplier;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -108,6 +109,14 @@ public final class StoreContext implements HeapSize {
return coprocessorHost;
}
+ public RegionInfo getRegionInfo() {
+ return regionFileSystem.getRegionInfo();
+ }
+
+ public boolean isPrimaryReplicaStore() {
+ return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;
+ }
+
public static Builder getBuilder() {
return new Builder();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index 60b3c3d..4033c33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -19,38 +19,131 @@
package org.apache.hadoop.hbase.regionserver;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-
+import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/**
- * StoreEngine is a factory that can create the objects necessary for HStore to operate.
- * Since not all compaction policies, compactors and store file managers are compatible,
- * they are tied together and replaced together via StoreEngine-s.
+ * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
+ * all compaction policies, compactors and store file managers are compatible, they are tied
+ * together and replaced together via StoreEngine-s.
+ * <p/>
+ * We expose read write lock methods to upper layer for store operations:<br/>
+ * <ul>
+ * <li>Locked in shared mode when the list of component stores is looked at:
+ * <ul>
+ * <li>all reads/writes to table data</li>
+ * <li>checking for split</li>
+ * </ul>
+ * </li>
+ * <li>Locked in exclusive mode when the list of component stores is modified:
+ * <ul>
+ * <li>closing</li>
+ * <li>completing a compaction</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p/>
+ * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
+ * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
+ * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
+ * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
+ * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
+ * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
+ * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
+ * we will hold write lock when updating it, the lock is also used to protect the normal read/write
+ * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
+ * memory state is, stripe or not, it does not effect how we track the store files. So consider all
+ * these facts, here we introduce a separated SFT to track the store files.
+ * <p/>
+ * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
+ * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
+ * reduce the possible misuse.
*/
@InterfaceAudience.Private
-public abstract class StoreEngine<SF extends StoreFlusher,
- CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
+public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
+ C extends Compactor, SFM extends StoreFileManager> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
+
protected SF storeFlusher;
protected CP compactionPolicy;
protected C compactor;
protected SFM storeFileManager;
+ private Configuration conf;
+ private StoreContext ctx;
+ private RegionCoprocessorHost coprocessorHost;
+ private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
+ private StoreFileTracker storeFileTracker;
+
+ private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
/**
- * The name of the configuration parameter that specifies the class of
- * a store engine that is used to manage and compact HBase store files.
+ * The name of the configuration parameter that specifies the class of a store engine that is used
+ * to manage and compact HBase store files.
*/
public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
- private static final Class<? extends StoreEngine<?, ?, ?, ?>>
- DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
+ private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
+ DefaultStoreEngine.class;
+
+ /**
+ * Acquire read lock of this store.
+ */
+ public void readLock() {
+ storeLock.readLock().lock();
+ }
+
+ /**
+ * Release read lock of this store.
+ */
+ public void readUnlock() {
+ storeLock.readLock().unlock();
+ }
+
+ /**
+ * Acquire write lock of this store.
+ */
+ public void writeLock() {
+ storeLock.writeLock().lock();
+ }
+
+ /**
+ * Release write lock of this store.
+ */
+ public void writeUnlock() {
+ storeLock.writeLock().unlock();
+ }
/**
* @return Compaction policy to use.
@@ -80,6 +173,11 @@ public abstract class StoreEngine<SF extends StoreFlusher,
return this.storeFlusher;
}
+ private StoreFileTracker createStoreFileTracker(HStore store) {
+ return StoreFileTrackerFactory.create(store.conf, store.getRegionInfo().getTable(),
+ store.isPrimaryReplicaStore(), store.getStoreContext());
+ }
+
/**
* @param filesCompacting Files currently compacting
* @return whether a compaction selection is possible
@@ -87,8 +185,8 @@ public abstract class StoreEngine<SF extends StoreFlusher,
public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
/**
- * Creates an instance of a compaction context specific to this engine.
- * Doesn't actually select or start a compaction. See CompactionContext class comment.
+ * Creates an instance of a compaction context specific to this engine. Doesn't actually select or
+ * start a compaction. See CompactionContext class comment.
* @return New CompactionContext object.
*/
public abstract CompactionContext createCompaction() throws IOException;
@@ -96,36 +194,347 @@ public abstract class StoreEngine<SF extends StoreFlusher,
/**
* Create the StoreEngine's components.
*/
- protected abstract void createComponents(
- Configuration conf, HStore store, CellComparator cellComparator) throws IOException;
+ protected abstract void createComponents(Configuration conf, HStore store,
+ CellComparator cellComparator) throws IOException;
- private void createComponentsOnce(
- Configuration conf, HStore store, CellComparator cellComparator) throws IOException {
- assert compactor == null && compactionPolicy == null
- && storeFileManager == null && storeFlusher == null;
+ protected final void createComponentsOnce(Configuration conf, HStore store,
+ CellComparator cellComparator) throws IOException {
+ assert compactor == null && compactionPolicy == null && storeFileManager == null &&
+ storeFlusher == null && storeFileTracker == null;
createComponents(conf, store, cellComparator);
- assert compactor != null && compactionPolicy != null
- && storeFileManager != null && storeFlusher != null;
+ this.conf = conf;
+ this.ctx = store.getStoreContext();
+ this.coprocessorHost = store.getHRegion().getCoprocessorHost();
+ this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
+ this.storeFileTracker = createStoreFileTracker(store);
+ assert compactor != null && compactionPolicy != null && storeFileManager != null &&
+ storeFlusher != null && storeFileTracker != null;
+ }
+
+ /**
+ * Create a writer for writing new store files.
+ * @return Writer for a new StoreFile
+ */
+ public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
+ return storeFileTracker.createWriter(params);
+ }
+
+ public HStoreFile createStoreFileAndReader(Path p) throws IOException {
+ StoreFileInfo info = new StoreFileInfo(conf, ctx.getRegionFileSystem().getFileSystem(), p,
+ ctx.isPrimaryReplicaStore());
+ return createStoreFileAndReader(info);
+ }
+
+ public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
+ info.setRegionCoprocessorHost(coprocessorHost);
+ HStoreFile storeFile =
+ new HStoreFile(info, ctx.getFamily().getBloomFilterType(), ctx.getCacheConf());
+ storeFile.initReader();
+ return storeFile;
+ }
+
+ /**
+ * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
+ * operation.
+ * @param path the path to the store file
+ */
+ public void validateStoreFile(Path path) throws IOException {
+ HStoreFile storeFile = null;
+ try {
+ storeFile = createStoreFileAndReader(path);
+ } catch (IOException e) {
+ LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
+ throw e;
+ } finally {
+ if (storeFile != null) {
+ storeFile.closeStoreFile(false);
+ }
+ }
+ }
+
+ private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
+ throws IOException {
+ if (CollectionUtils.isEmpty(files)) {
+ return Collections.emptyList();
+ }
+ // initialize the thread pool for opening store files in parallel..
+ ExecutorService storeFileOpenerThreadPool =
+ openStoreFileThreadPoolCreator.apply("StoreFileOpener-" +
+ ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString());
+ CompletionService<HStoreFile> completionService =
+ new ExecutorCompletionService<>(storeFileOpenerThreadPool);
+
+ int totalValidStoreFile = 0;
+ for (StoreFileInfo storeFileInfo : files) {
+ // The StoreFileInfo will carry store configuration down to HFile, we need to set it to
+ // our store's CompoundConfiguration here.
+ storeFileInfo.setConf(conf);
+ // open each store file in parallel
+ completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
+ totalValidStoreFile++;
+ }
+
+ Set<String> compactedStoreFiles = new HashSet<>();
+ ArrayList<HStoreFile> results = new ArrayList<>(files.size());
+ IOException ioe = null;
+ try {
+ for (int i = 0; i < totalValidStoreFile; i++) {
+ try {
+ HStoreFile storeFile = completionService.take().get();
+ if (storeFile != null) {
+ LOG.debug("loaded {}", storeFile);
+ results.add(storeFile);
+ compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
+ }
+ } catch (InterruptedException e) {
+ if (ioe == null) {
+ ioe = new InterruptedIOException(e.getMessage());
+ }
+ } catch (ExecutionException e) {
+ if (ioe == null) {
+ ioe = new IOException(e.getCause());
+ }
+ }
+ }
+ } finally {
+ storeFileOpenerThreadPool.shutdownNow();
+ }
+ if (ioe != null) {
+ // close StoreFile readers
+ boolean evictOnClose =
+ ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true;
+ for (HStoreFile file : results) {
+ try {
+ if (file != null) {
+ file.closeStoreFile(evictOnClose);
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not close store file {}", file, e);
+ }
+ }
+ throw ioe;
+ }
+
+ // Should not archive the compacted store files when region warmup. See HBASE-22163.
+ if (!warmup) {
+ // Remove the compacted files from result
+ List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
+ for (HStoreFile storeFile : results) {
+ if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
+ LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
+ storeFile.getReader().close(
+ storeFile.getCacheConf() != null ? storeFile.getCacheConf().shouldEvictOnClose() :
+ true);
+ filesToRemove.add(storeFile);
+ }
+ }
+ results.removeAll(filesToRemove);
+ if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
+ LOG.debug("Moving the files {} to archive", filesToRemove);
+ ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(),
+ filesToRemove);
+ }
+ }
+
+ return results;
+ }
+
+ public void initialize(boolean warmup) throws IOException {
+ List<StoreFileInfo> fileInfos = storeFileTracker.load();
+ List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
+ storeFileManager.loadFiles(files);
+ }
+
+ public void refreshStoreFiles() throws IOException {
+ List<StoreFileInfo> fileInfos = storeFileTracker.load();
+ refreshStoreFilesInternal(fileInfos);
+ }
+
+ public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
+ List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
+ for (String file : newFiles) {
+ storeFiles
+ .add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), file));
+ }
+ refreshStoreFilesInternal(storeFiles);
+ }
+
+ /**
+ * Checks the underlying store files, and opens the files that have not been opened, and removes
+ * the store file readers for store files no longer available. Mainly used by secondary region
+ * replicas to keep up to date with the primary region files.
+ */
+ private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
+ Collection<HStoreFile> currentFiles = storeFileManager.getStorefiles();
+ Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
+ if (currentFiles == null) {
+ currentFiles = Collections.emptySet();
+ }
+ if (newFiles == null) {
+ newFiles = Collections.emptySet();
+ }
+ if (compactedFiles == null) {
+ compactedFiles = Collections.emptySet();
+ }
+
+ HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
+ for (HStoreFile sf : currentFiles) {
+ currentFilesSet.put(sf.getFileInfo(), sf);
+ }
+ HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
+ for (HStoreFile sf : compactedFiles) {
+ compactedFilesSet.put(sf.getFileInfo(), sf);
+ }
+
+ Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
+ // Exclude the files that have already been compacted
+ newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
+ Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
+ Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
+
+ if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
+ return;
+ }
+
+ LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles +
+ " files to remove: " + toBeRemovedFiles);
+
+ Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
+ for (StoreFileInfo sfi : toBeRemovedFiles) {
+ toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
+ }
+
+ // try to open the files
+ List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
+
+ // propogate the file changes to the underlying store file manager
+ replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); // won't throw an exception
+ }
+
+ /**
+ * Commit the given {@code files}.
+ * <p/>
+ * We will move the file into data directory, and open it.
+ * @param files the files want to commit
+ * @param validate whether to validate the store files
+ * @return the committed store files
+ */
+ public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
+ List<HStoreFile> committedFiles = new ArrayList<>(files.size());
+ HRegionFileSystem hfs = ctx.getRegionFileSystem();
+ String familyName = ctx.getFamily().getNameAsString();
+ Path storeDir = hfs.getStoreDir(familyName);
+ for (Path file : files) {
+ try {
+ if (validate) {
+ validateStoreFile(file);
+ }
+ Path committedPath;
+ // As we want to support writing to data directory directly, here we need to check whether
+ // the store file is already in the right place
+ if (file.getParent() != null && file.getParent().equals(storeDir)) {
+ // already in the right place, skip renmaing
+ committedPath = file;
+ } else {
+ // Write-out finished successfully, move into the right spot
+ committedPath = hfs.commitStoreFile(familyName, file);
+ }
+ HStoreFile sf = createStoreFileAndReader(committedPath);
+ committedFiles.add(sf);
+ } catch (IOException e) {
+ LOG.error("Failed to commit store file {}", file, e);
+ // Try to delete the files we have committed before.
+ // It is OK to fail when deleting as leaving the file there does not cause any data
+ // corruption problem. It just introduces some duplicated data which may impact read
+ // performance a little when reading before compaction.
+ for (HStoreFile sf : committedFiles) {
+ Path pathToDelete = sf.getPath();
+ try {
+ sf.deleteStoreFile();
+ } catch (IOException deleteEx) {
+ LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
+ deleteEx);
+ }
+ }
+ throw new IOException("Failed to commit the flush", e);
+ }
+ }
+ return committedFiles;
+ }
+
+ @FunctionalInterface
+ public interface IOExceptionRunnable {
+ void run() throws IOException;
+ }
+
+ /**
+ * Add the store files to store file manager, and also record it in the store file tracker.
+ * <p/>
+ * The {@code actionAfterAdding} will be executed after the insertion to store file manager, under
+ * the lock protection. Usually this is for clear the memstore snapshot.
+ */
+ public void addStoreFiles(Collection<HStoreFile> storeFiles,
+ IOExceptionRunnable actionAfterAdding) throws IOException {
+ storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
+ writeLock();
+ try {
+ storeFileManager.insertNewFiles(storeFiles);
+ actionAfterAdding.run();
+ } finally {
+ // We need the lock, as long as we are updating the storeFiles
+ // or changing the memstore. Let us release it before calling
+ // notifyChangeReadersObservers. See HBASE-4485 for a possible
+ // deadlock scenario that could have happened if continue to hold
+ // the lock.
+ writeUnlock();
+ }
+ }
+
+ public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
+ Collection<HStoreFile> newFiles) throws IOException {
+ storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
+ StoreUtils.toStoreFileInfo(newFiles));
+ writeLock();
+ try {
+ storeFileManager.addCompactionResults(compactedFiles, newFiles);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
+ writeLock();
+ try {
+ storeFileManager.removeCompactedFiles(compactedFiles);
+ } finally {
+ writeUnlock();
+ }
}
/**
* Create the StoreEngine configured for the given Store.
- * @param store The store. An unfortunate dependency needed due to it
- * being passed to coprocessors via the compactor.
+ * @param store The store. An unfortunate dependency needed due to it being passed to coprocessors
+ * via the compactor.
* @param conf Store configuration.
* @param cellComparator CellComparator for storeFileManager.
* @return StoreEngine to use.
*/
- public static StoreEngine<?, ?, ?, ?> create(
- HStore store, Configuration conf, CellComparator cellComparator) throws IOException {
+ public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
+ CellComparator cellComparator) throws IOException {
String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
try {
- StoreEngine<?,?,?,?> se = ReflectionUtils.instantiateWithCustomCtor(
- className, new Class[] { }, new Object[] { });
+ StoreEngine<?, ?, ?, ?> se =
+ ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
se.createComponentsOnce(conf, store, cellComparator);
return se;
} catch (Exception e) {
throw new IOException("Unable to load configured store engine '" + className + "'", e);
}
}
+
+ @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
+ allowedOnPath = ".*/TestHStore.java")
+ ReadWriteLock getLock() {
+ return storeLock;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index 27127f3..a40b209 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
@@ -49,12 +50,16 @@ public interface StoreFileManager {
* Loads the initial store files into empty StoreFileManager.
* @param storeFiles The files to load.
*/
+ @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
+ allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
void loadFiles(List<HStoreFile> storeFiles);
/**
* Adds new files, either for from MemStore flush or bulk insert, into the structure.
* @param sfs New store files.
*/
+ @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
+ allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
void insertNewFiles(Collection<HStoreFile> sfs);
/**
@@ -62,12 +67,16 @@ public interface StoreFileManager {
* @param compactedFiles The input files for the compaction.
* @param results The resulting files for the compaction.
*/
+ @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
+ allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
void addCompactionResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results);
/**
* Remove the compacted files
* @param compactedFiles the list of compacted files
*/
+ @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
+ allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
void removeCompactedFiles(Collection<HStoreFile> compactedFiles);
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 67eb375..1095854 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -69,10 +69,17 @@ abstract class StoreFlusher {
writer.close();
}
+ protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag)
+ throws IOException {
+ return store.getStoreEngine()
+ .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount())
+ .compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false)
+ .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
+ .shouldDropBehind(false));
+ }
/**
* Creates the scanner for flushing snapshot. Also calls coprocessors.
- * @param snapshotScanners
* @return The scanner; null if coprocessor is canceling the flush.
*/
protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
index 454b244..10a9330 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
@@ -20,10 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
-
+import java.util.function.Predicate;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@@ -42,10 +45,13 @@ import org.slf4j.LoggerFactory;
* Utility functions for region server storage layer.
*/
@InterfaceAudience.Private
-public class StoreUtils {
+public final class StoreUtils {
private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class);
+ private StoreUtils() {
+ }
+
/**
* Creates a deterministic hash code for store file collection.
*/
@@ -171,4 +177,31 @@ public class StoreUtils {
return new CompoundConfiguration().add(conf).addBytesMap(td.getValues())
.addStringMap(cfd.getConfiguration()).addBytesMap(cfd.getValues());
}
+
+ public static List<StoreFileInfo> toStoreFileInfo(Collection<HStoreFile> storefiles) {
+ return storefiles.stream().map(HStoreFile::getFileInfo).collect(Collectors.toList());
+ }
+
+ public static long getTotalUncompressedBytes(List<HStoreFile> files) {
+ return files.stream()
+ .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes))
+ .sum();
+ }
+
+ public static long getStorefilesSize(Collection<HStoreFile> files,
+ Predicate<HStoreFile> predicate) {
+ return files.stream().filter(predicate)
+ .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum();
+ }
+
+ public static long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) {
+ if (file == null) {
+ return 0L;
+ }
+ StoreFileReader reader = file.getReader();
+ if (reader == null) {
+ return 0L;
+ }
+ return f.applyAsLong(reader);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
index 14863a6..bfb3f64 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
@@ -20,20 +20,19 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index 1560aef..f8183b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -70,7 +70,7 @@ public class StripeStoreFlusher extends StoreFlusher {
StripeMultiFileWriter mw = null;
try {
mw = req.createWriter(); // Writer according to the policy.
- StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount);
+ StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot);
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory);
@@ -98,13 +98,12 @@ public class StripeStoreFlusher extends StoreFlusher {
return result;
}
- private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) {
+ private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) {
return new StripeMultiFileWriter.WriterFactory() {
@Override
public StoreFileWriter createWriter() throws IOException {
- StoreFileWriter writer = store.createWriterInTmp(kvCount,
- store.getColumnFamilyDescriptor().getCompressionType(), false, true, true, false);
- return writer;
+ // XXX: it used to always pass true for includesTag, re-consider?
+ return StripeStoreFlusher.this.createWriter(snapshot, true);
}
};
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
index 42841bf..533be17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -51,13 +51,14 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
WriterFactory writerFactory = new WriterFactory() {
@Override
public StoreFileWriter createWriter() throws IOException {
- return createTmpWriter(fd, shouldDropBehind, major);
+ return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major);
}
@Override
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
- throws IOException {
- return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy, major);
+ throws IOException {
+ return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind,
+ fileStoragePolicy, major);
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 7f70e02..8178fb1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.regionserver.CellSink;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -60,6 +60,7 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
/**
@@ -260,29 +261,32 @@ public abstract class Compactor<T extends CellSink> {
}
};
+ protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
+ boolean major) {
+ return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
+ .compression(major ? majorCompactionCompression : minorCompactionCompression)
+ .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
+ .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
+ .totalCompactedFilesSize(fd.totalCompactedFilesSize);
+ }
+
/**
- * Creates a writer for a new file in a temporary directory.
+ * Creates a writer for a new file.
* @param fd The file details.
- * @return Writer for a new StoreFile in the tmp dir.
+ * @return Writer for a new StoreFile
* @throws IOException if creation failed
*/
- protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, boolean major)
- throws IOException {
+ protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
+ boolean major) throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
- return store.createWriterInTmp(fd.maxKeyCount,
- major ? majorCompactionCompression : minorCompactionCompression,
- true, fd.maxMVCCReadpoint > 0,
- fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
- HConstants.EMPTY_STRING);
+ return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major));
}
- protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
- String fileStoragePolicy, boolean major) throws IOException {
- return store.createWriterInTmp(fd.maxKeyCount,
- major ? majorCompactionCompression : minorCompactionCompression,
- true, fd.maxMVCCReadpoint > 0,
- fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy);
+ protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
+ String fileStoragePolicy, boolean major) throws IOException {
+ return store.getStoreEngine()
+ .createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy));
}
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 49d3e8e..afa2429 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -45,14 +45,14 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
}
private final CellSinkFactory<StoreFileWriter> writerFactory =
- new CellSinkFactory<StoreFileWriter>() {
- @Override
- public StoreFileWriter createWriter(InternalScanner scanner,
- org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
- boolean shouldDropBehind, boolean major) throws IOException {
- return createTmpWriter(fd, shouldDropBehind, major);
- }
- };
+ new CellSinkFactory<StoreFileWriter>() {
+ @Override
+ public StoreFileWriter createWriter(InternalScanner scanner,
+ org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
+ boolean shouldDropBehind, boolean major) throws IOException {
+ return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major);
+ }
+ };
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
new file mode 100644
index 0000000..d4c9a86
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The default implementation for store file tracker, where we do not persist the store file list,
+ * and use listing when loading store files.
+ */
+@InterfaceAudience.Private
+class DefaultStoreFileTracker extends StoreFileTrackerBase {
+
+ public DefaultStoreFileTracker(Configuration conf, TableName tableName, boolean isPrimaryReplica,
+ StoreContext ctx) {
+ super(conf, tableName, isPrimaryReplica, ctx);
+ }
+
+ @Override
+ public List<StoreFileInfo> load() throws IOException {
+ return ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
+ }
+
+ @Override
+ public boolean requireWritingToTmpDirFirst() {
+ return true;
+ }
+
+ @Override
+ protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
+ // NOOP
+ }
+
+ @Override
+ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
+ Collection<StoreFileInfo> newFiles) throws IOException {
+ // NOOP
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
new file mode 100644
index 0000000..aadedc8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * An interface to define how we track the store files for a give store.
+ * <p/>
+ * In the old time, we will write store to a tmp directory first, and then rename it to the actual
+ * data file. And once a store file is under data directory, we will consider it as 'committed'. And
+ * we need to do listing when loading store files.
+ * <p/>
+ * When cloud age is coming, now we want to store the store files on object storage, where rename
+ * and list are not as cheap as on HDFS, especially rename. Although introducing a metadata
+ * management layer for object storage could solve the problem, but we still want HBase to run on
+ * pure object storage, so here we introduce this interface to abstract how we track the store
+ * files. For the old implementation, we just persist nothing here, and do listing to load store
+ * files. When running on object storage, we could persist the store file list in a system region,
+ * or in a file on the object storage, to make it possible to write directly into the data directory
+ * to avoid renaming, and also avoid listing when loading store files.
+ * <p/>
+ * The implementation requires to be thread safe as flush and compaction may occur as the same time,
+ * and we could also do multiple compactions at the same time. As the implementation may choose to
+ * persist the store file list to external storage, which could be slow, it is the duty for the
+ * callers to not call it inside a lock which may block normal read/write requests.
+ */
+@InterfaceAudience.Private
+public interface StoreFileTracker {
+
+ /**
+ * Load the store files list when opening a region.
+ */
+ List<StoreFileInfo> load() throws IOException;
+
+ /**
+ * Add new store files.
+ * <p/>
+ * Used for flush and bulk load.
+ */
+ void add(Collection<StoreFileInfo> newFiles) throws IOException;
+
+ /**
+ * Add new store files and remove compacted store files after compaction.
+ */
+ void replace(Collection<StoreFileInfo> compactedFiles, Collection<StoreFileInfo> newFiles)
+ throws IOException;
+
+ /**
+ * Create a writer for writing new store files.
+ * @return Writer for a new StoreFile
+ */
+ StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
new file mode 100644
index 0000000..2451f45
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for all store file tracker.
+ * <p/>
+ * Mainly used to place the common logic to skip persistent for secondary replicas.
+ */
+@InterfaceAudience.Private
+abstract class StoreFileTrackerBase implements StoreFileTracker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
+
+ protected final Configuration conf;
+
+ protected final TableName tableName;
+
+ protected final boolean isPrimaryReplica;
+
+ protected final StoreContext ctx;
+
+ private volatile boolean cacheOnWriteLogged;
+
+ protected StoreFileTrackerBase(Configuration conf, TableName tableName, boolean isPrimaryReplica,
+ StoreContext ctx) {
+ this.conf = conf;
+ this.tableName = tableName;
+ this.isPrimaryReplica = isPrimaryReplica;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
+ if (isPrimaryReplica) {
+ doAddNewStoreFiles(newFiles);
+ }
+ }
+
+ @Override
+ public final void replace(Collection<StoreFileInfo> compactedFiles,
+ Collection<StoreFileInfo> newFiles) throws IOException {
+ if (isPrimaryReplica) {
+ doAddCompactionResults(compactedFiles, newFiles);
+ }
+ }
+
+ private HFileContext createFileContext(Compression.Algorithm compression,
+ boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
+ if (compression == null) {
+ compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
+ }
+ ColumnFamilyDescriptor family = ctx.getFamily();
+ HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
+ .withIncludesTags(includesTag).withCompression(compression)
+ .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
+ .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
+ .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
+ .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
+ .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
+ .withTableName(tableName.getName()).withCellComparator(ctx.getComparator()).build();
+ return hFileContext;
+ }
+
+ @Override
+ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params)
+ throws IOException {
+ if (!isPrimaryReplica) {
+ throw new IllegalStateException("Should not call create writer on secondary replicas");
+ }
+ // creating new cache config for each new writer
+ final CacheConfig cacheConf = ctx.getCacheConf();
+ final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
+ long totalCompactedFilesSize = params.totalCompactedFilesSize();
+ if (params.isCompaction()) {
+ // Don't cache data on write on compactions, unless specifically configured to do so
+ // Cache only when total file size remains lower than configured threshold
+ final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
+ // if data blocks are to be cached on write
+ // during compaction, we should forcefully
+ // cache index and bloom blocks as well
+ if (cacheCompactedBlocksOnWrite &&
+ totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
+ writerCacheConf.enableCacheOnWrite();
+ if (!cacheOnWriteLogged) {
+ LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
+ "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
+ cacheOnWriteLogged = true;
+ }
+ } else {
+ writerCacheConf.setCacheDataOnWrite(false);
+ if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
+ // checking condition once again for logging
+ LOG.debug(
+ "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " +
+ "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
+ this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
+ }
+ }
+ } else {
+ final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
+ if (shouldCacheDataOnWrite) {
+ writerCacheConf.enableCacheOnWrite();
+ if (!cacheOnWriteLogged) {
+ LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " +
+ "Index blocks and Bloom filter blocks", this);
+ cacheOnWriteLogged = true;
+ }
+ }
+ }
+ Encryption.Context encryptionContext = ctx.getEncryptionContext();
+ HFileContext hFileContext = createFileContext(params.compression(),
+ params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
+ Path outputDir;
+ if (requireWritingToTmpDirFirst()) {
+ outputDir =
+ new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
+ } else {
+ throw new UnsupportedOperationException("not supported yet");
+ }
+ StoreFileWriter.Builder builder =
+ new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
+ .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
+ .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
+ .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
+ .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
+ .withFileStoragePolicy(params.fileStoragePolicy());
+ return builder.build();
+ }
+
+ /**
+ * Whether the implementation of this tracker requires you to write to temp directory first, i.e,
+ * does not allow broken store files under the actual data directory.
+ */
+ protected abstract boolean requireWritingToTmpDirFirst();
+
+ protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
+
+ protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
+ Collection<StoreFileInfo> newFiles) throws IOException;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
new file mode 100644
index 0000000..4f7231b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Factory method for creating store file tracker.
+ */
+@InterfaceAudience.Private
+public final class StoreFileTrackerFactory {
+
+ public static StoreFileTracker create(Configuration conf, TableName tableName,
+ boolean isPrimaryReplica, StoreContext ctx) {
+ return new DefaultStoreFileTracker(conf, tableName, isPrimaryReplica, ctx);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
index f765b35..2112b97 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
@@ -101,7 +101,6 @@ class MajorCompactionRequest {
boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
throws IOException {
-
// do we have any store files?
Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
if (storeFiles == null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 8adffd3..c869e5e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -210,11 +210,13 @@ public class TestIOFencing {
@Override
protected void refreshStoreSizeAndTotalBytes() throws IOException {
- try {
- r.compactionsWaiting.countDown();
- r.compactionsBlocked.await();
- } catch (InterruptedException ex) {
- throw new IOException(ex);
+ if (r != null) {
+ try {
+ r.compactionsWaiting.countDown();
+ r.compactionsBlocked.await();
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
}
super.refreshStoreSizeAndTotalBytes();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
index 7d6d624..f825905 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
@@ -215,8 +215,10 @@ public class TestCacheOnWriteInSchema {
@Test
public void testCacheOnWriteInSchema() throws IOException {
// Write some random data into the store
- StoreFileWriter writer = store.createWriterInTmp(Integer.MAX_VALUE,
- HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true, false, false);
+ StoreFileWriter writer = store.getStoreEngine()
+ .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(Integer.MAX_VALUE)
+ .compression(HFile.DEFAULT_COMPRESSION_ALGORITHM).isCompaction(false)
+ .includeMVCCReadpoint(true).includesTag(false).shouldDropBehind(false));
writeStoreFile(writer);
writer.close();
// Verify the block types of interest were cached on write
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
index d36eb11..3784876 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -65,9 +65,12 @@ public class TestDefaultStoreEngine {
DummyCompactionPolicy.class.getName());
conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
DummyStoreFlusher.class.getName());
+ HRegion mockRegion = Mockito.mock(HRegion.class);
HStore mockStore = Mockito.mock(HStore.class);
- Mockito.when(mockStore.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
- StoreEngine<?, ?, ?, ?> se = StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR);
+ Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
+ StoreEngine<?, ?, ?, ?> se =
+ StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR);
Assert.assertTrue(se instanceof DefaultStoreEngine);
Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy);
Assert.assertTrue(se.getStoreFlusher() instanceof DummyStoreFlusher);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 839cf34..0a1a8e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -5746,7 +5746,7 @@ public class TestHRegion {
Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
- .getStoreFiles(families[0]);
+ .getStoreFiles(Bytes.toString(families[0]));
Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
verifyData(secondaryRegion, 0, 1000, cq, families);
@@ -7647,7 +7647,7 @@ public class TestHRegion {
getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
for (Path newFile : newFiles) {
// Create storefile around what we wrote with a reader on it.
- HStoreFile sf = createStoreFileAndReader(newFile);
+ HStoreFile sf = storeEngine.createStoreFileAndReader(newFile);
sf.closeStoreFile(evictOnClose);
sfs.add(sf);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 2fdd627..bdee770 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -53,8 +53,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.IntBinaryOperator;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -315,7 +315,7 @@ public class TestHStore {
/**
* Verify that compression and data block encoding are respected by the
- * Store.createWriterInTmp() method, used on store flush.
+ * createWriter method, used on store flush.
*/
@Test
public void testCreateWriter() throws Exception {
@@ -327,9 +327,11 @@ public class TestHStore {
.build();
init(name.getMethodName(), conf, hcd);
- // Test createWriterInTmp()
- StoreFileWriter writer =
- store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
+ // Test createWriter
+ StoreFileWriter writer = store.getStoreEngine()
+ .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
+ .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
+ .includesTag(false).shouldDropBehind(false));
Path path = writer.getPath();
writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
@@ -1027,19 +1029,19 @@ public class TestHStore {
// add one more file
addStoreFile();
- HStore spiedStore = spy(store);
+ StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
// call first time after files changed
- spiedStore.refreshStoreFiles();
+ spiedStoreEngine.refreshStoreFiles();
assertEquals(2, this.store.getStorefilesCount());
- verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
+ verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
// call second time
- spiedStore.refreshStoreFiles();
+ spiedStoreEngine.refreshStoreFiles();
// ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
// refreshed,
- verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
+ verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
}
private long countMemStoreScanner(StoreScanner scanner) {
@@ -1650,7 +1652,7 @@ public class TestHStore {
// Do compaction
MyThread thread = new MyThread(storeScanner);
thread.start();
- store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
+ store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
thread.join();
KeyValueHeap heap2 = thread.getHeap();
assertFalse(heap.equals(heap2));
@@ -1729,8 +1731,10 @@ public class TestHStore {
@Test
public void testHFileContextSetWithCFAndTable() throws Exception {
init(this.name.getMethodName());
- StoreFileWriter writer = store.createWriterInTmp(10000L,
- Compression.Algorithm.NONE, false, true, false, true);
+ StoreFileWriter writer = store.getStoreEngine()
+ .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
+ .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
+ .includesTag(false).shouldDropBehind(true));
HFileContext hFileContext = writer.getHFileWriter().getFileContext();
assertArrayEquals(family, hFileContext.getColumnFamily());
assertArrayEquals(table, hFileContext.getTableName());
@@ -3277,7 +3281,8 @@ public class TestHStore {
int currentCount = clearSnapshotCounter.incrementAndGet();
if (currentCount == 1) {
try {
- if (store.lock.isWriteLockedByCurrentThread()) {
+ if (((ReentrantReadWriteLock) store.getStoreEngine().getLock())
+ .isWriteLockedByCurrentThread()) {
shouldWait = false;
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index be52892..814299a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -245,7 +245,7 @@ public class TestRegionMergeTransactionOnCluster {
TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
int count = 0;
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
- count += hrfs.getStoreFiles(colFamily.getName()).size();
+ count += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
}
ADMIN.compactRegion(mergedRegionInfo.getRegionName());
// clean up the merged region store files
@@ -254,7 +254,7 @@ public class TestRegionMergeTransactionOnCluster {
int newcount = 0;
while (EnvironmentEdgeManager.currentTime() < timeout) {
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
- newcount += hrfs.getStoreFiles(colFamily.getName()).size();
+ newcount += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
}
if(newcount > count) {
break;
@@ -273,7 +273,7 @@ public class TestRegionMergeTransactionOnCluster {
while (EnvironmentEdgeManager.currentTime() < timeout) {
int newcount1 = 0;
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
- newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
+ newcount1 += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
}
if(newcount1 <= 1) {
break;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
index 30a005d..daec3a2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
@@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -98,7 +97,7 @@ public class TestStoreFileRefresherChore {
}
@Override
- public Collection<StoreFileInfo> getStoreFiles(String familyName) throws IOException {
+ public List<StoreFileInfo> getStoreFiles(String familyName) throws IOException {
if (fail) {
throw new IOException("simulating FS failure");
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
index f20691b..5372b84 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
@@ -29,7 +29,6 @@ import java.util.NavigableSet;
import java.util.Random;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -126,13 +125,12 @@ public class TestStoreScannerClosure {
p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
region.put(p);
HStore store = region.getStore(fam);
- ReentrantReadWriteLock lock = store.lock;
// use the lock to manually get a new memstore scanner. this is what
// HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here
//since it is just a testcase).
- lock.readLock().lock();
+ store.getStoreEngine().readLock();
final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
- lock.readLock().unlock();
+ store.getStoreEngine().readUnlock();
Thread closeThread = new Thread() {
public void run() {
// close should be completed
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
index 7a93f89..498068b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -36,7 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
@@ -118,8 +118,10 @@ public class TestStripeStoreEngine {
}
private static TestStoreEngine createEngine(Configuration conf) throws Exception {
+ HRegion region = mock(HRegion.class);
HStore store = mock(HStore.class);
- when(store.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
+ when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ when(store.getHRegion()).thenReturn(region);
CellComparatorImpl kvComparator = mock(CellComparatorImpl.class);
return (TestStoreEngine)StoreEngine.create(store, conf, kvComparator);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
index 92ba76d..9db1fad5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
@@ -22,9 +22,6 @@ import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.cre
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -41,14 +38,16 @@ import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
@@ -107,11 +106,10 @@ public class TestDateTieredCompactor {
when(store.getScanInfo()).thenReturn(si);
when(store.areWritesEnabled()).thenReturn(true);
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
- when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
- when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
- anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
- when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
- anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
+ when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
+ StoreEngine storeEngine = mock(StoreEngine.class);
+ when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
+ when(store.getStoreEngine()).thenReturn(storeEngine);
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index d9f32fb..8e84797 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -30,7 +30,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
@@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
@@ -861,12 +862,9 @@ public class TestStripeCompactionPolicy {
when(info.getRegionNameAsString()).thenReturn("testRegion");
when(store.getColumnFamilyDescriptor()).thenReturn(col);
when(store.getRegionInfo()).thenReturn(info);
- when(
- store.createWriterInTmp(anyLong(), any(), anyBoolean(),
- anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
- when(
- store.createWriterInTmp(anyLong(), any(), anyBoolean(),
- anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
+ StoreEngine storeEngine = mock(StoreEngine.class);
+ when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
+ when(store.getStoreEngine()).thenReturn(storeEngine);
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
index 6e8b19f..c87b11f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
@@ -21,9 +21,6 @@ import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K
import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -39,13 +36,15 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
@@ -206,11 +205,10 @@ public class TestStripeCompactor {
when(store.getScanInfo()).thenReturn(si);
when(store.areWritesEnabled()).thenReturn(true);
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
- when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
- when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
- anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
- when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
- anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
+ when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
+ StoreEngine storeEngine = mock(StoreEngine.class);
+ when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
+ when(store.getStoreEngine()).thenReturn(storeEngine);
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
return new StripeCompactor(conf, store) {