You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/04/17 14:54:14 UTC
[hbase] branch branch-2 updated: HBASE-26938 Compaction failures after StoreFileTracker integration (#4350)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new ae8eccd5c09 HBASE-26938 Compaction failures after StoreFileTracker integration (#4350)
ae8eccd5c09 is described below
commit ae8eccd5c09d534169987390d0b957c714c257c7
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sun Apr 17 21:58:12 2022 +0800
HBASE-26938 Compaction failures after StoreFileTracker integration (#4350)
Introduce a StoreFileWriterCreationTracker to track the store files being written
Signed-off-by: Josh Elser <el...@apache.org>
Signed-off-by: Andrew Purtell <ap...@apache.org>
(cherry picked from commit 48c4a4626e54724beb4ed46becbedbe292841dc4)
---
.../hadoop/hbase/mob/DefaultMobStoreCompactor.java | 23 ++-
.../hadoop/hbase/mob/DefaultMobStoreFlusher.java | 5 +-
.../regionserver/AbstractMultiFileWriter.java | 8 +-
.../hbase/regionserver/BrokenStoreFileCleaner.java | 2 +-
.../regionserver/CreateStoreFileWriterParams.java | 14 +-
.../regionserver/DateTieredMultiFileWriter.java | 2 +-
.../hbase/regionserver/DefaultStoreFlusher.java | 7 +-
.../apache/hadoop/hbase/regionserver/HStore.java | 180 +++++++++++++++------
.../hadoop/hbase/regionserver/StoreEngine.java | 17 +-
.../hadoop/hbase/regionserver/StoreFileWriter.java | 34 +++-
.../hadoop/hbase/regionserver/StoreFlusher.java | 22 +--
.../hbase/regionserver/StripeMultiFileWriter.java | 2 +-
.../hbase/regionserver/StripeStoreFlusher.java | 21 ++-
.../compactions/AbstractMultiOutputCompactor.java | 19 +--
.../compactions/CompactionProgress.java | 2 +-
.../compactions/CompactionRequestImpl.java | 12 +-
.../hbase/regionserver/compactions/Compactor.java | 128 ++++++++-------
.../compactions/DateTieredCompactor.java | 21 ++-
.../regionserver/compactions/DefaultCompactor.java | 19 +--
.../regionserver/compactions/StripeCompactor.java | 38 +++--
.../storefiletracker/StoreFileTrackerBase.java | 12 +-
.../hbase/regionserver/TestCompactorMemLeak.java | 5 +-
.../hadoop/hbase/regionserver/TestHStore.java | 7 +-
.../hbase/regionserver/TestMajorCompaction.java | 19 ---
.../TestSplitTransactionOnCluster.java | 17 +-
.../regionserver/wal/AbstractTestWALReplay.java | 8 +-
26 files changed, 395 insertions(+), 249 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index dd800f9b2c3..03b861be7a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -22,12 +22,14 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
@@ -84,10 +87,14 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
- boolean shouldDropBehind, boolean major) throws IOException {
+ boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
+ throws IOException {
// make this writer with tags always because of possible new cells with tags.
- return store.getStoreEngine().createWriter(
- createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true));
+ return store.getStoreEngine()
+ .createWriter(
+ createParams(fd, shouldDropBehind, major, writerCreationTracker)
+ .includeMVCCReadpoint(true)
+ .includesTag(true));
}
};
@@ -155,18 +162,19 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* the scanner to filter the deleted cells.
* @param fd File details
* @param scanner Where to read from.
+ * @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
+ * @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
- protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
+ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
- boolean major, int numofFilesToCompact) throws IOException {
- long bytesWrittenProgressForCloseCheck = 0;
+ boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Since scanner.next() can return 'false' but still be delivering data,
@@ -370,9 +378,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
return true;
}
-
@Override
- protected List<Path> commitWriter(FileDetails fd,
+ protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index a52ce2b05c0..10ef4c450e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -100,7 +101,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
- FlushLifeCycleTracker tracker) throws IOException {
+ FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
long cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
@@ -114,7 +115,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
- writer = createWriter(snapshot, true);
+ writer = createWriter(snapshot, true, writerCreationTracker);
IOException e = null;
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
index 82c3867c103..a824b501c82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -67,7 +67,7 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
* comments in HBASE-15400 for more details.
*/
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
- return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET);
+ return commitWriters(maxSeqId, majorCompaction, Collections.emptyList());
}
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
@@ -110,11 +110,7 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
return paths;
}
- /**
- * Returns all writers. This is used to prevent deleting currently writen storefiles
- * during cleanup.
- */
- public abstract Collection<StoreFileWriter> writers();
+ protected abstract Collection<StoreFileWriter> writers();
/**
* Subclasses override this method to be called at the end of a successful sequence of append; all
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java
index 0c4807d8bad..042acb0bbc9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java
@@ -152,7 +152,7 @@ public class BrokenStoreFileCleaner extends ScheduledChore {
}
private boolean isCompactionResultFile(FileStatus file, HStore store) {
- return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
+ return store.getStoreFilesBeingWritten().contains(file.getPath());
}
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
index 10cd9f009e4..1d45e1c51c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.util.function.Consumer;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.yetus.audience.InterfaceAudience;
@@ -40,6 +42,8 @@ public final class CreateStoreFileWriterParams {
private String fileStoragePolicy = HConstants.EMPTY_STRING;
+ private Consumer<Path> writerCreationTracker;
+
private CreateStoreFileWriterParams() {
}
@@ -127,8 +131,16 @@ public final class CreateStoreFileWriterParams {
return this;
}
+ public Consumer<Path> writerCreationTracker() {
+ return writerCreationTracker;
+ }
+
+ public CreateStoreFileWriterParams writerCreationTracker(Consumer<Path> writerCreationTracker) {
+ this.writerCreationTracker = writerCreationTracker;
+ return this;
+ }
+
public static CreateStoreFileWriterParams create() {
return new CreateStoreFileWriterParams();
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
index 1e10eb2db23..8201cb152c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -71,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
}
@Override
- public Collection<StoreFileWriter> writers() {
+ protected Collection<StoreFileWriter> writers() {
return lowerBoundary2Writer.values();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 306760d7ce6..0f3daa4c177 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -44,8 +45,8 @@ public class DefaultStoreFlusher extends StoreFlusher {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
- MonitoredTask status, ThroughputController throughputController,
- FlushLifeCycleTracker tracker) throws IOException {
+ MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
+ Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
@@ -59,7 +60,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
- writer = createWriter(snapshot, false);
+ writer = createWriter(snapshot, false, writerCreationTracker);
IOException e = null;
try {
performFlush(scanner, writer, throughputController);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 41f18382073..3da95c18142 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -156,8 +158,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// rows that has cells from both memstore and files (or only files)
private LongAdder mixedRowReadsCount = new LongAdder();
- private boolean cacheOnWriteLogged;
-
/**
* Lock specific to archiving compacted store files. This avoids races around
* the combination of retrieving the list of compacted files and moving them to
@@ -215,14 +215,46 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private final StoreContext storeContext;
+ // Used to track the store files which are currently being written. For compaction, if we want to
+ // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to
+ // track the store files being written when flushing.
+ // Notice that the creation is in the background compaction or flush thread and we will get the
+ // files in other thread, so it needs to be thread safe.
+ private static final class StoreFileWriterCreationTracker implements Consumer<Path> {
+
+ private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ @Override
+ public void accept(Path t) {
+ files.add(t);
+ }
+
+ public Set<Path> get() {
+ return Collections.unmodifiableSet(files);
+ }
+ }
+
+ // We may have multiple compaction running at the same time, and flush can also happen at the same
+ // time, so here we need to use a collection, and the collection needs to be thread safe.
+ // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to
+ // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to
+ // IdentityHashMap if later we want to implement hashCode or equals.
+ private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ // For the SFT implementation which we will write tmp store file first, we do not need to clean up
+ // the broken store files under the data directory, which means we do not need to track the store
+ // file writer creation. So here we abstract a factory to return different trackers for different
+ // SFT implementations.
+ private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory;
+
/**
* Constructor
* @param family HColumnDescriptor for this column
- * @param confParam configuration object failed. Can be null.
+ * @param confParam configuration object failed. Can be null.
*/
protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
final Configuration confParam, boolean warmup) throws IOException {
-
this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family);
this.region = region;
@@ -267,6 +299,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
storeEngine.initialize(warmup);
+ // if require writing to tmp dir first, then we just return null, which indicate that we do not
+ // need to track the creation of store file writer, otherwise we return a new
+ // StoreFileWriterCreationTracker.
+ this.storeFileWriterCreationTrackerFactory =
+ storeEngine.requireWritingToTmpDirFirst() ? () -> null
+ : () -> new StoreFileWriterCreationTracker();
refreshStoreSizeAndTotalBytes();
flushRetriesNumber = conf.getInt(
@@ -290,7 +328,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
parallelPutCountPrintThreshold, family.getDataBlockEncoding(),
family.getCompressionType());
- cacheOnWriteLogged = false;
}
private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
@@ -795,8 +832,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
* @throws IOException if exception occurs during process
*/
protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
- MonitoredTask status, ThroughputController throughputController,
- FlushLifeCycleTracker tracker) throws IOException {
+ MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
+ Consumer<Path> writerCreationTracker) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
@@ -806,8 +843,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
IOException lastException = null;
for (int i = 0; i < flushRetriesNumber; i++) {
try {
- List<Path> pathNames =
- flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker);
+ List<Path> pathNames = flusher.flushSnapshot(
+ snapshot,
+ logCacheFlushId,
+ status,
+ throughputController,
+ tracker,
+ writerCreationTracker);
Path lastPathName = null;
try {
for (Path pathName : pathNames) {
@@ -1118,6 +1160,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
ThroughputController throughputController, User user) throws IOException {
assert compaction != null;
CompactionRequestImpl cr = compaction.getRequest();
+ StoreFileWriterCreationTracker writerCreationTracker =
+ storeFileWriterCreationTrackerFactory.get();
+ if (writerCreationTracker != null) {
+ cr.setWriterCreationTracker(writerCreationTracker);
+ storeFileWriterCreationTrackers.add(writerCreationTracker);
+ }
try {
// Do all sanity checking in here if we have a valid CompactionRequestImpl
// because we need to clean up after it on the way out in a finally
@@ -1157,18 +1205,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
replaceStoreFiles(filesToCompact, sfs, true);
- // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
- // CleanerChore know that compaction is done and the file can be cleaned up if compaction
- // have failed.
- storeEngine.resetCompactionWriter();
-
- if (cr.isMajor()) {
- majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
- majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
- } else {
- compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
- compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
- }
long outputBytes = getTotalSize(sfs);
// At this point the store will use new files for all new scanners.
@@ -1576,6 +1612,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
synchronized (filesCompacting) {
filesCompacting.removeAll(cr.getFiles());
}
+ // The tracker could be null, for example, we do not need to track the creation of store file
+ // writer due to different implementation of SFT, or the compaction is canceled.
+ if (cr.getWriterCreationTracker() != null) {
+ storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker());
+ }
}
/**
@@ -1899,6 +1940,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private final class StoreFlusherImpl implements StoreFlushContext {
private final FlushLifeCycleTracker tracker;
+ private final StoreFileWriterCreationTracker writerCreationTracker;
private final long cacheFlushSeqNum;
private MemStoreSnapshot snapshot;
private List<Path> tempFiles;
@@ -1910,6 +1952,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
this.cacheFlushSeqNum = cacheFlushSeqNum;
this.tracker = tracker;
+ this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get();
}
/**
@@ -1930,41 +1973,61 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
public void flushCache(MonitoredTask status) throws IOException {
RegionServerServices rsService = region.getRegionServerServices();
ThroughputController throughputController =
- rsService == null ? null : rsService.getFlushThroughputController();
- tempFiles =
- HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker);
+ rsService == null ? null : rsService.getFlushThroughputController();
+ // it could be null if we do not need to track the creation of store file writer due to
+ // different SFT implementation.
+ if (writerCreationTracker != null) {
+ HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker);
+ }
+ tempFiles = HStore.this.flushCache(
+ cacheFlushSeqNum,
+ snapshot,
+ status,
+ throughputController,
+ tracker,
+ writerCreationTracker);
}
@Override
public boolean commit(MonitoredTask status) throws IOException {
- if (CollectionUtils.isEmpty(this.tempFiles)) {
- return false;
- }
- status.setStatus("Flushing " + this + ": reopening flushed file");
- List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
- for (HStoreFile sf : storeFiles) {
- StoreFileReader r = sf.getReader();
- if (LOG.isInfoEnabled()) {
- LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),
- cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));
+ try {
+ if (CollectionUtils.isEmpty(this.tempFiles)) {
+ return false;
+ }
+ status.setStatus("Flushing " + this + ": reopening flushed file");
+ List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
+ for (HStoreFile sf : storeFiles) {
+ StoreFileReader r = sf.getReader();
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Added {}, entries={}, sequenceid={}, filesize={}",
+ sf,
+ r.getEntries(),
+ cacheFlushSeqNum,
+ TraditionalBinaryPrefix.long2String(r.length(), "", 1));
+ }
+ outputFileSize += r.length();
+ storeSize.addAndGet(r.length());
+ totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
+ committedFiles.add(sf.getPath());
}
- outputFileSize += r.length();
- storeSize.addAndGet(r.length());
- totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
- committedFiles.add(sf.getPath());
- }
- flushedCellsCount.addAndGet(cacheFlushCount);
- flushedCellsSize.addAndGet(cacheFlushSize);
- flushedOutputFileSize.addAndGet(outputFileSize);
- // call coprocessor after we have done all the accounting above
- for (HStoreFile sf : storeFiles) {
- if (getCoprocessorHost() != null) {
- getCoprocessorHost().postFlush(HStore.this, sf, tracker);
+ flushedCellsCount.addAndGet(cacheFlushCount);
+ flushedCellsSize.addAndGet(cacheFlushSize);
+ flushedOutputFileSize.addAndGet(outputFileSize);
+ // call coprocessor after we have done all the accounting above
+ for (HStoreFile sf : storeFiles) {
+ if (getCoprocessorHost() != null) {
+ getCoprocessorHost().postFlush(HStore.this, sf, tracker);
+ }
+ }
+ // Add new file to store files. Clear snapshot too while we have the Store write lock.
+ return completeFlush(storeFiles, snapshot.getId());
+ } finally {
+ if (writerCreationTracker != null) {
+ HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker);
}
}
- // Add new file to store files. Clear snapshot too while we have the Store write lock.
- return completeFlush(storeFiles, snapshot.getId());
}
@Override
@@ -2110,6 +2173,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
return majorCompactedCellsSize.get();
}
+ public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) {
+ if (isMajor) {
+ majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
+ majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
+ } else {
+ compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
+ compactedCellsSize.addAndGet(progress.totalCompactedSize);
+ }
+ }
+
/**
* Returns the StoreEngine that is backing this concrete implementation of Store.
* @return Returns the {@link StoreEngine} object used internally inside this HStore object.
@@ -2404,4 +2477,15 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
mixedRowReadsCount.increment();
}
}
+
+ /**
+ * Return the storefiles which are currently being written to. Mainly used by
+ * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
+ * SFT yet.
+ */
+ Set<Path> getStoreFilesBeingWritten() {
+ return storeFileWriterCreationTrackers.stream()
+ .flatMap(t -> t.get().stream())
+ .collect(Collectors.toSet());
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index d85553ac808..847187074df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -42,11 +42,9 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -94,7 +92,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
*/
@InterfaceAudience.Private
public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
- C extends Compactor, SFM extends StoreFileManager> {
+ C extends Compactor<?>, SFM extends StoreFileManager> {
private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
@@ -157,7 +155,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
/**
* @return Compactor to use.
*/
- public Compactor getCompactor() {
+ public Compactor<?> getCompactor() {
return this.compactor;
}
@@ -544,17 +542,6 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
return storeFileTracker.requireWritingToTmpDirFirst();
}
- /**
- * Resets the compaction writer when the new file is committed and used as active storefile.
- * This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
- * CleanerChore know that compaction is done and the file can be cleaned up if compaction
- * have failed. Currently called in
- * @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
- */
- public void resetCompactionWriter(){
- compactor.resetWriter();
- }
-
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
allowedOnPath = ".*/TestHStore.java")
ReadWriteLock getLock() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index a4084cbd379..829028eec63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -423,9 +424,14 @@ public class StoreFileWriter implements CellSink, ShipperListener {
private boolean shouldDropCacheBehind;
private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
private String fileStoragePolicy;
-
- public Builder(Configuration conf, CacheConfig cacheConf,
- FileSystem fs) {
+ // this is used to track the creation of the StoreFileWriter, mainly used for the SFT
+ // implementation where we will write store files directly to the final place, instead of
+ // writing a tmp file first. Under this scenario, we will have a background task to purge the
+ // store files which are not recorded in the SFT, but for the newly created store file writer,
+ // they are not tracked in SFT, so here we need to record them and treat them specially.
+ private Consumer<Path> writerCreationTracker;
+
+ public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) {
this.conf = conf;
this.cacheConf = cacheConf;
this.fs = fs;
@@ -509,6 +515,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
return this;
}
+ public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) {
+ this.writerCreationTracker = writerCreationTracker;
+ return this;
+ }
+
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
@@ -557,8 +568,21 @@ public class StoreFileWriter implements CellSink, ShipperListener {
bloomType = BloomType.NONE;
}
}
-
- return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount,
+ // make sure we call this before actually create the writer
+ // in fact, it is not a big deal to even add an inexistent file to the track, as we will never
+ // try to delete it and finally we will clean the tracker up after compaction. But if the file
+ // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file
+ // and cause problem.
+ if (writerCreationTracker != null) {
+ writerCreationTracker.accept(filePath);
+ }
+ return new StoreFileWriter(
+ fs,
+ filePath,
+ conf,
+ cacheConf,
+ bloomType,
+ maxKeyCount,
favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 1095854273a..d6461f7729a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
-
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -55,8 +55,8 @@ abstract class StoreFlusher {
* @return List of files written. Can be empty; must not be null.
*/
public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
- MonitoredTask status, ThroughputController throughputController,
- FlushLifeCycleTracker tracker) throws IOException;
+ MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
+ Consumer<Path> writerCreationTracker) throws IOException;
protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException {
@@ -69,13 +69,17 @@ abstract class StoreFlusher {
writer.close();
}
- protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag)
- throws IOException {
+ protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag,
+ Consumer<Path> writerCreationTracker) throws IOException {
return store.getStoreEngine()
- .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount())
- .compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false)
- .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
- .shouldDropBehind(false));
+ .createWriter(
+ CreateStoreFileWriterParams.create()
+ .maxKeyCount(snapshot.getCellsCount())
+ .compression(store.getColumnFamilyDescriptor().getCompressionType())
+ .isCompaction(false)
+ .includeMVCCReadpoint(true)
+ .includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
+ .shouldDropBehind(false).writerCreationTracker(writerCreationTracker));
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
index a4e943ac8b0..fc0598d89ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
@@ -58,7 +58,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
}
@Override
- public Collection<StoreFileWriter> writers() {
+ protected Collection<StoreFileWriter> writers() {
return existingWriters;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index f8183b7645a..fb9115e01ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -23,7 +23,7 @@ import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
@@ -54,11 +54,14 @@ public class StripeStoreFlusher extends StoreFlusher {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
- MonitoredTask status, ThroughputController throughputController,
- FlushLifeCycleTracker tracker) throws IOException {
+ MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
+ Consumer<Path> writerCreationTracker) throws IOException {
List<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount();
- if (cellsCount == 0) return result; // don't flush if there are no entries
+ if (cellsCount == 0) {
+ // don't flush if there are no entries
+ return result;
+ }
InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
@@ -70,8 +73,9 @@ public class StripeStoreFlusher extends StoreFlusher {
StripeMultiFileWriter mw = null;
try {
mw = req.createWriter(); // Writer according to the policy.
- StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot);
- StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
+ StripeMultiFileWriter.WriterFactory factory =
+ createWriterFactory(snapshot, writerCreationTracker);
+ StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
mw.init(storeScanner, factory);
synchronized (flushLock) {
@@ -98,12 +102,13 @@ public class StripeStoreFlusher extends StoreFlusher {
return result;
}
- private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) {
+ private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot,
+ Consumer<Path> writerCreationTracker) {
return new StripeMultiFileWriter.WriterFactory() {
@Override
public StoreFileWriter createWriter() throws IOException {
// XXX: it used to always pass true for includesTag, re-consider?
- return StripeStoreFlusher.this.createWriter(snapshot, true);
+ return StripeStoreFlusher.this.createWriter(snapshot, true, writerCreationTracker);
}
};
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
index 19b7a98627e..23d16934b65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
-
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -46,19 +46,21 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
super(conf, store);
}
- protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
- final FileDetails fd, final boolean shouldDropBehind, boolean major) {
+ protected final void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
+ final FileDetails fd, final boolean shouldDropBehind, boolean major,
+ Consumer<Path> writerCreationTracker) {
WriterFactory writerFactory = new WriterFactory() {
@Override
public StoreFileWriter createWriter() throws IOException {
- return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major);
+ return AbstractMultiOutputCompactor.this
+ .createWriter(fd, shouldDropBehind, major, writerCreationTracker);
}
@Override
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
throws IOException {
- return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind,
- fileStoragePolicy, major);
+ return AbstractMultiOutputCompactor.this
+ .createWriter(fd, shouldDropBehind, fileStoragePolicy, major, writerCreationTracker);
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
@@ -68,7 +70,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
}
@Override
- protected void abortWriter() throws IOException {
+ protected void abortWriter(AbstractMultiFileWriter writer) throws IOException {
FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) {
try {
@@ -79,7 +81,6 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
e);
}
}
- //this step signals that the target file is no longer writen and can be cleaned up
- writer = null;
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
index 942cc4f3fd6..2ccdd150cd2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
@@ -37,7 +37,7 @@ public class CompactionProgress {
private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class);
/** the total compacting key values in currently running compaction */
- private long totalCompactingKVs;
+ public long totalCompactingKVs;
/** the completed count of key values in currently running compaction */
public long currentCompactedKVs = 0;
/** the total size of data processed by the currently running compaction, in bytes */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
index 899219d70b2..5d8285aecdb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
@@ -22,8 +22,9 @@ import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
import java.util.Collection;
import java.util.Collections;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
-
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@@ -51,6 +52,7 @@ public class CompactionRequestImpl implements CompactionRequest {
private String storeName = "";
private long totalSize = -1L;
private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
+ private Consumer<Path> writerCreationTracker;
public CompactionRequestImpl(Collection<HStoreFile> files) {
this.selectionTime = EnvironmentEdgeManager.currentTime();
@@ -137,6 +139,14 @@ public class CompactionRequestImpl implements CompactionRequest {
return tracker;
}
+ public Consumer<Path> getWriterCreationTracker() {
+ return writerCreationTracker;
+ }
+
+ public void setWriterCreationTracker(Consumer<Path> writerCreationTracker) {
+ this.writerCreationTracker = writerCreationTracker;
+ }
+
public boolean isAfterSplit() {
return isAfterSplit;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index d934ecb0c16..5ac64823ae0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -25,12 +25,13 @@ import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELET
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -39,7 +40,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
-import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
@@ -70,15 +70,18 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
/**
* A compactor is a compaction algorithm associated a given policy. Base class also contains
* reusable parts for implementing compactors (what is common and what isn't is evolving).
+ * <p>
+ * Compactions might be concurrent against a given store and the Compactor is shared among
+ * them. Do not put mutable state into class fields. All Compactor class fields should be
+ * final or effectively final.
+ * 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it.
*/
@InterfaceAudience.Private
public abstract class Compactor<T extends CellSink> {
private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
- protected volatile CompactionProgress progress;
protected final Configuration conf;
protected final HStore store;
-
protected final int compactionKVMax;
protected final Compression.Algorithm majorCompactionCompression;
protected final Compression.Algorithm minorCompactionCompression;
@@ -92,15 +95,15 @@ public abstract class Compactor<T extends CellSink> {
protected static final String MINOR_COMPACTION_DROP_CACHE =
"hbase.regionserver.minorcompaction.pagecache.drop";
- private final boolean dropCacheMajor;
- private final boolean dropCacheMinor;
+ protected final boolean dropCacheMajor;
+ protected final boolean dropCacheMinor;
- // In compaction process only a single thread will access and write to this field, and
- // getCompactionTargets is the only place we will access it other than the compaction thread, so
- // make it volatile.
- protected volatile T writer = null;
+ // We track progress per request using the CompactionRequestImpl identity as key.
+ // completeCompaction() cleans up this state.
+ private final Set<CompactionProgress> progressSet =
+ Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
- //TODO: depending on Store is not good but, realistically, all compactors currently do.
+ // TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(Configuration conf, HStore store) {
this.conf = conf;
this.store = store;
@@ -116,15 +119,9 @@ public abstract class Compactor<T extends CellSink> {
this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
}
-
-
protected interface CellSinkFactory<S> {
- S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major)
- throws IOException;
- }
-
- public CompactionProgress getProgress() {
- return this.progress;
+ S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major,
+ Consumer<Path> writerCreationTracker) throws IOException;
}
/** The sole reason this class exists is that java has no ref/out/pointer parameters. */
@@ -271,12 +268,13 @@ public abstract class Compactor<T extends CellSink> {
};
protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
- boolean major) {
+ boolean major, Consumer<Path> writerCreationTracker) {
return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
.compression(major ? majorCompactionCompression : minorCompactionCompression)
.isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
.includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
- .totalCompactedFilesSize(fd.totalCompactedFilesSize);
+ .totalCompactedFilesSize(fd.totalCompactedFilesSize)
+ .writerCreationTracker(writerCreationTracker);
}
/**
@@ -286,16 +284,20 @@ public abstract class Compactor<T extends CellSink> {
* @throws IOException if creation failed
*/
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
- boolean major) throws IOException {
+ boolean major, Consumer<Path> writerCreationTracker) throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
- return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major));
+ return store.getStoreEngine()
+ .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker));
}
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
- String fileStoragePolicy, boolean major) throws IOException {
+ String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker)
+ throws IOException {
return store.getStoreEngine()
- .createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy));
+ .createWriter(
+ createParams(fd, shouldDropBehind, major, writerCreationTracker)
+ .fileStoragePolicy(fileStoragePolicy));
}
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
@@ -327,7 +329,6 @@ public abstract class Compactor<T extends CellSink> {
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
- this.progress = new CompactionProgress(fd.maxKeyCount);
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
@@ -343,6 +344,9 @@ public abstract class Compactor<T extends CellSink> {
boolean finished = false;
List<StoreFileScanner> scanners =
createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
+ T writer = null;
+ CompactionProgress progress = new CompactionProgress(fd.maxKeyCount);
+ progressSet.add(progress);
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
@@ -355,14 +359,14 @@ public abstract class Compactor<T extends CellSink> {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
- if (writer != null){
- LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream()
- .map(n -> n.toString())
- .collect(Collectors.joining(", ", "{ ", " }")));
- }
- writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor());
- finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId,
- throughputController, request.isAllFiles(), request.getFiles().size());
+ writer = sinkFactory.createWriter(
+ scanner,
+ fd,
+ dropCache,
+ request.isMajor(),
+ request.getWriterCreationTracker());
+ finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
+ throughputController, request.isAllFiles(), request.getFiles().size(), progress);
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
@@ -380,34 +384,41 @@ public abstract class Compactor<T extends CellSink> {
} else {
Closeables.close(scanner, true);
}
- if (!finished && writer != null) {
- abortWriter();
+ if (!finished) {
+ if (writer != null) {
+ abortWriter(writer);
+ }
+ } else {
+ store.updateCompactedMetrics(request.isMajor(), progress);
}
+ progressSet.remove(progress);
}
assert finished : "We should have exited the method on all error paths";
assert writer != null : "Writer should be non-null if no error";
- return commitWriter(fd, request);
+ return commitWriter(writer, fd, request);
}
- protected abstract List<Path> commitWriter(FileDetails fd,
+ protected abstract List<Path> commitWriter(T writer, FileDetails fd,
CompactionRequestImpl request) throws IOException;
- protected abstract void abortWriter() throws IOException;
+ protected abstract void abortWriter(T writer) throws IOException;
/**
* Performs the compaction.
* @param fd FileDetails of cell sink writer
* @param scanner Where to read from.
+ * @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
* smallestReadPoint
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
+ * @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
- protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
+ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
- boolean major, int numofFilesToCompact) throws IOException {
+ boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
assert writer instanceof ShipperListener;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
@@ -549,22 +560,27 @@ public abstract class Compactor<T extends CellSink> {
dropDeletesFromRow, dropDeletesToRow);
}
- public List<Path> getCompactionTargets() {
- T writer = this.writer;
- if (writer == null) {
- return Collections.emptyList();
- }
- if (writer instanceof StoreFileWriter) {
- return Arrays.asList(((StoreFileWriter) writer).getPath());
+ /**
+ * Return the aggregate progress for all currently active compactions.
+ */
+ public CompactionProgress getProgress() {
+ synchronized (progressSet) {
+ long totalCompactingKVs = 0;
+ long currentCompactedKVs = 0;
+ long totalCompactedSize = 0;
+ for (CompactionProgress progress: progressSet) {
+ totalCompactingKVs += progress.totalCompactingKVs;
+ currentCompactedKVs += progress.currentCompactedKVs;
+ totalCompactedSize += progress.totalCompactedSize;
+ }
+ CompactionProgress result = new CompactionProgress(totalCompactingKVs);
+ result.currentCompactedKVs = currentCompactedKVs;
+ result.totalCompactedSize = totalCompactedSize;
+ return result;
}
- return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> sfw.getPath())
- .collect(Collectors.toList());
}
- /**
- * Reset the Writer when the new storefiles were successfully added
- */
- public void resetWriter(){
- writer = null;
+ public boolean isCompacting() {
+ return !progressSet.isEmpty();
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index 43e037c5e70..c8c10e16ff1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
-
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
@@ -68,21 +68,26 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
@Override
public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
- boolean shouldDropBehind, boolean major) throws IOException {
- DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
- lowerBoundariesPolicies,
- needEmptyFile(request));
- initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
+ boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
+ throws IOException {
+ DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(
+ lowerBoundaries,
+ lowerBoundariesPolicies,
+ needEmptyFile(request));
+ initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
return writer;
}
- }, throughputController, user);
+ },
+ throughputController,
+ user);
}
@Override
- protected List<Path> commitWriter(FileDetails fd,
+ protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> pathList =
writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
return pathList;
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 03e3a1b5f39..0e91d8870b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.List;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore;
@@ -47,10 +48,11 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
private final CellSinkFactory<StoreFileWriter> writerFactory =
new CellSinkFactory<StoreFileWriter>() {
@Override
- public StoreFileWriter createWriter(InternalScanner scanner,
- org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
- boolean shouldDropBehind, boolean major) throws IOException {
- return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major);
+ public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd,
+ boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
+ throws IOException {
+ return DefaultCompactor.this
+ .createWriter(fd, shouldDropBehind, major, writerCreationTracker);
}
};
@@ -63,7 +65,7 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
}
@Override
- protected List<Path> commitWriter(FileDetails fd,
+ protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
@@ -72,12 +74,6 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
}
@Override
- protected void abortWriter() throws IOException {
- abortWriter(writer);
- // this step signals that the target file is no longer written and can be cleaned up
- writer = null;
- }
-
protected final void abortWriter(StoreFileWriter writer) throws IOException {
Path leftoverFile = writer.getPath();
try {
@@ -92,4 +88,5 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
leftoverFile, e);
}
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 060a11b41fe..6413a304d55 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.List;
-
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore;
@@ -88,18 +88,26 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
}
LOG.debug(sb.toString());
}
- return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
+ return compact(
+ request,
+ new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
new CellSinkFactory<StripeMultiFileWriter>() {
@Override
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
- boolean shouldDropBehind, boolean major) throws IOException {
+ boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
+ throws IOException {
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
- store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow);
- initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
+ store.getComparator(),
+ targetBoundaries,
+ majorRangeFromRow,
+ majorRangeToRow);
+ initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
return writer;
}
- }, throughputController, user);
+ },
+ throughputController,
+ user);
}
public List<Path> compact(CompactionRequestImpl request, final int targetCount, final long targetSize,
@@ -115,20 +123,28 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
@Override
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
- boolean shouldDropBehind, boolean major) throws IOException {
+ boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
+ throws IOException {
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
- store.getComparator(), targetCount, targetSize, left, right);
- initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
+ store.getComparator(),
+ targetCount,
+ targetSize,
+ left,
+ right);
+ initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
return writer;
}
- }, throughputController, user);
+ },
+ throughputController,
+ user);
}
@Override
- protected List<Path> commitWriter(FileDetails fd,
+ protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
return newFiles;
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
index 1bf354f00a0..f3e62670796 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
@@ -177,11 +177,15 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
}
StoreFileWriter.Builder builder =
new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
- .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
- .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
- .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
+ .withOutputDir(outputDir)
+ .withBloomType(ctx.getBloomFilterType())
+ .withMaxKeyCount(params.maxKeyCount())
+ .withFavoredNodes(ctx.getFavoredNodes())
+ .withFileContext(hFileContext)
+ .withShouldDropCacheBehind(params.shouldDropBehind())
.withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
- .withFileStoragePolicy(params.fileStoragePolicy());
+ .withFileStoragePolicy(params.fileStoragePolicy())
+ .withWriterCreationTracker(params.writerCreationTracker());
return builder.build();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
index 1b76c52cd6e..eaf40616711 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
@@ -128,13 +128,14 @@ public class TestCompactorMemLeak {
}
@Override
- protected List<Path> commitWriter(FileDetails fd,
+ protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
Cell cell = writerImpl.getLastCell();
// The cell should be backend with an KeyOnlyKeyValue.
IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
- return super.commitWriter(fd, request);
+ return super.commitWriter(writer, fd, request);
}
}
+
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index ea7ff61986b..80dbe28af3d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
import java.util.function.IntBinaryOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -97,6 +98,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
@@ -2491,9 +2493,10 @@ public class TestHStore {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
- FlushLifeCycleTracker tracker) throws IOException {
+ FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
counter.incrementAndGet();
- return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
+ return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
+ writerCreationTracker);
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
index dc699a62c0c..b9cf81f51df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
@@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -54,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -211,26 +209,9 @@ public class TestMajorCompaction {
Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).readVersions(100));
assertEquals(compactionThreshold, result.size());
- // see if CompactionProgress is in place but null
- for (HStore store : r.getStores()) {
- assertNull(store.getCompactionProgress());
- }
-
r.flush(true);
r.compact(true);
- // see if CompactionProgress has done its thing on at least one store
- int storeCount = 0;
- for (HStore store : r.getStores()) {
- CompactionProgress progress = store.getCompactionProgress();
- if (progress != null) {
- ++storeCount;
- assertTrue(progress.currentCompactedKVs > 0);
- assertTrue(progress.getTotalCompactingKVs() > 0);
- }
- assertTrue(storeCount > 0);
- }
-
// look at the second row
// Increment the least significant character so we get to next row.
byte[] secondRowBytes = START_KEY_BYTES.clone();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 8738b9a554f..afff1a68389 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -92,7 +92,6 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -118,11 +117,9 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
@@ -621,17 +618,11 @@ public class TestSplitTransactionOnCluster {
assertEquals(1, region.getStores().size());
HStore store = region.getStores().get(0);
while (store.hasReferences()) {
- // Wait on any current compaction to complete first.
- CompactionProgress progress = store.getCompactionProgress();
- if (progress != null && progress.getProgressPct() < 1.0f) {
- while (progress.getProgressPct() < 1.0f) {
- LOG.info("Waiting, progress={}", progress.getProgressPct());
- Threads.sleep(1000);
- }
- } else {
- // Run new compaction. Shoudn't be any others running.
- region.compact(true);
+ while (store.storeEngine.getCompactor().isCompacting()) {
+ Threads.sleep(100);
}
+ // Run new compaction. Shoudn't be any others running.
+ region.compact(true);
store.closeAndArchiveCompactedFiles();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index fd5404c4242..0f18b16fe48 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -43,6 +43,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -67,8 +68,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
@@ -654,11 +653,12 @@ public abstract class AbstractTestWALReplay {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
- FlushLifeCycleTracker tracker) throws IOException {
+ FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
if (throwExceptionWhenFlushing.get()) {
throw new IOException("Simulated exception by tests");
}
- return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
+ return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
+ writerCreationTracker);
}
};