You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/07/17 09:41:15 UTC
[hbase] branch branch-2 updated: HBASE-24382 Flush partial stores
of region filtered by seqId when archive wal due to too many wals (#2049)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new bf368a0 HBASE-24382 Flush partial stores of region filtered by seqId when archive wal due to too many wals (#2049)
bf368a0 is described below
commit bf368a01bcbc2863e5376a49d969e20281c4c868
Author: bsglz <18...@qq.com>
AuthorDate: Fri Jul 17 17:40:50 2020 +0800
HBASE-24382 Flush partial stores of region filtered by seqId when archive wal due to too many wals (#2049)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../hbase/master/region/MasterRegionWALRoller.java | 7 +-
.../hadoop/hbase/regionserver/FlushPolicy.java | 1 -
.../hadoop/hbase/regionserver/FlushRequester.java | 20 +++--
.../apache/hadoop/hbase/regionserver/HRegion.java | 48 +++++++++---
.../hadoop/hbase/regionserver/HRegionServer.java | 2 +-
.../hadoop/hbase/regionserver/LogRoller.java | 8 +-
.../hadoop/hbase/regionserver/MemStoreFlusher.java | 41 +++++------
.../hbase/regionserver/wal/AbstractFSWAL.java | 30 +++++---
.../regionserver/wal/SequenceIdAccounting.java | 22 +++---
.../apache/hadoop/hbase/wal/AbstractWALRoller.java | 17 +++--
.../hadoop/hbase/wal/DisabledWALProvider.java | 4 +-
.../main/java/org/apache/hadoop/hbase/wal/WAL.java | 9 ++-
.../regionserver/TestFailedAppendAndSync.java | 7 +-
.../hbase/regionserver/TestFlushRegionEntry.java | 4 +-
.../hbase/regionserver/TestHeapMemoryManager.java | 85 ++++++++++++----------
.../hbase/regionserver/TestSplitWalDataLoss.java | 2 +-
.../hbase/regionserver/wal/AbstractTestFSWAL.java | 76 ++++++++++++++-----
.../regionserver/wal/AbstractTestWALReplay.java | 12 ++-
.../hadoop/hbase/regionserver/wal/TestFSHLog.java | 2 +-
.../regionserver/wal/TestSequenceIdAccounting.java | 2 +-
20 files changed, 252 insertions(+), 147 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java
index e18aa0c..ef3dd12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.region;
import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,8 +40,8 @@ import org.slf4j.LoggerFactory;
* roller logic by our own.
* <p/>
* We can reuse most of the code for normal wal roller, the only difference is that there is only
- * one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the master
- * local region.
+ * one region, so in {@link #scheduleFlush(String, List)} method we can just schedule flush
+ * for the master local region.
*/
@InterfaceAudience.Private
public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
@@ -79,7 +80,7 @@ public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
}
@Override
- protected void scheduleFlush(String encodedRegionName) {
+ protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
MasterRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
if (flusher != null) {
flusher.requestFlush();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
index fecbd2f..66bd095 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
@@ -45,5 +45,4 @@ public abstract class FlushPolicy extends Configured {
* @return the stores need to be flushed.
*/
public abstract Collection<HStore> selectStoresToFlush();
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
index 4191fbf..92aed2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.regionserver;
+import java.util.List;
+
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -30,22 +32,28 @@ public interface FlushRequester {
* Tell the listener the cache needs to be flushed.
*
* @param region the Region requesting the cache flush
- * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
- * rolling.
* @return true if our region is added into the queue, false otherwise
*/
- boolean requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker);
+ boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker);
+
+ /**
+ * Tell the listener the cache needs to be flushed.
+ *
+ * @param region the Region requesting the cache flush
+ * @param families stores of region to flush, if null then use flush policy
+ * @return true if our region is added into the queue, false otherwise
+ */
+ boolean requestFlush(HRegion region, List<byte[]> families,
+ FlushLifeCycleTracker tracker);
/**
* Tell the listener the cache needs to be flushed after a delay
*
* @param region the Region requesting the cache flush
* @param delay after how much time should the flush happen
- * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
- * rolling.
* @return true if our region is added into the queue, false otherwise
*/
- boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores);
+ boolean requestDelayedFlush(HRegion region, long delay);
/**
* Register a FlushRequestListener
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 487deea..1ff4f38 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2322,7 +2322,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*
* <p>This method may block for some time, so it should not be called from a
* time-sensitive thread.
- * @param force whether we want to force a flush of all stores
+ * @param flushAllStores whether we want to force a flush of all stores
* @return FlushResult indicating whether the flush was successful or not and if
* the region needs compacting
*
@@ -2330,8 +2330,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* because a snapshot was not properly persisted.
*/
// TODO HBASE-18905. We might have to expose a requestFlush API for CPs
- public FlushResult flush(boolean force) throws IOException {
- return flushcache(force, false, FlushLifeCycleTracker.DUMMY);
+ public FlushResult flush(boolean flushAllStores) throws IOException {
+ return flushcache(flushAllStores, false, FlushLifeCycleTracker.DUMMY);
}
public interface FlushResult {
@@ -2354,6 +2354,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean isCompactionNeeded();
}
+ public FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
+ FlushLifeCycleTracker tracker) throws IOException {
+ List families = null;
+ if (flushAllStores) {
+ families = new ArrayList();
+ families.addAll(this.getTableDescriptor().getColumnFamilyNames());
+ }
+ return this.flushcache(families, writeFlushRequestWalMarker, tracker);
+ }
+
/**
* Flush the cache.
*
@@ -2367,7 +2377,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*
* <p>This method may block for some time, so it should not be called from a
* time-sensitive thread.
- * @param forceFlushAllStores whether we want to flush all stores
+ * @param families stores of region to flush.
* @param writeFlushRequestWalMarker whether to write the flush request marker to WAL
* @param tracker used to track the life cycle of this flush
* @return whether the flush is success and whether the region needs compacting
@@ -2377,8 +2387,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* because a Snapshot was not properly persisted. The region is put in closing mode, and the
* caller MUST abort after this.
*/
- public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker,
- FlushLifeCycleTracker tracker) throws IOException {
+ public FlushResultImpl flushcache(List<byte[]> families,
+ boolean writeFlushRequestWalMarker, FlushLifeCycleTracker tracker) throws IOException {
// fail-fast instead of waiting on the lock
if (this.closing.get()) {
String msg = "Skipping flush on " + this + " because closing";
@@ -2425,8 +2435,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
try {
- Collection<HStore> specificStoresToFlush =
- forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
+ // The reason that we do not always use flushPolicy is, when the flush is
+ // caused by logRoller, we should select stores which must be flushed
+ // rather than could be flushed.
+ Collection<HStore> specificStoresToFlush = null;
+ if (families != null) {
+ specificStoresToFlush = getSpecificStores(families);
+ } else {
+ specificStoresToFlush = flushPolicy.selectStoresToFlush();
+ }
FlushResultImpl fs =
internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker);
@@ -2457,6 +2474,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
+ * get stores which matches the specified families
+ *
+ * @return the stores need to be flushed.
+ */
+ private Collection<HStore> getSpecificStores(List<byte[]> families) {
+ Collection<HStore> specificStoresToFlush = new ArrayList<>();
+ for (byte[] family : families) {
+ specificStoresToFlush.add(stores.get(family));
+ }
+ return specificStoresToFlush;
+ }
+
+ /**
* Should the store be flushed because it is old enough.
* <p>
* Every FlushPolicy should call this to determine whether a store is old enough to flush (except
@@ -8916,7 +8946,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
if (shouldFlush) {
// Make request outside of synchronize block; HBASE-818.
- this.rsServices.getFlushRequester().requestFlush(this, false, tracker);
+ this.rsServices.getFlushRequester().requestFlush(this, tracker);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 20af5d9..a726de6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1857,7 +1857,7 @@ public class HRegionServer extends Thread implements
//Throttle the flushes by putting a delay. If we don't throttle, and there
//is a balanced write-load on the regions in a table, we might end up
//overwhelming the filesystem with too many flushes at once.
- if (requester.requestDelayedFlush(r, randomDelay, false)) {
+ if (requester.requestDelayedFlush(r, randomDelay)) {
LOG.info("{} requesting flush of {} because {} after random delay {} ms",
getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(),
randomDelay);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index f5049c9..58ac82e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -18,7 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.util.List;
import java.util.Map;
+
import org.apache.hadoop.hbase.wal.AbstractWALRoller;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience;
@@ -45,7 +47,7 @@ public class LogRoller extends AbstractWALRoller<RegionServerServices> {
super("LogRoller", services.getConfiguration(), services);
}
- protected void scheduleFlush(String encodedRegionName) {
+ protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
RegionServerServices services = this.abortable;
HRegion r = (HRegion) services.getRegion(encodedRegionName);
if (r == null) {
@@ -58,8 +60,8 @@ public class LogRoller extends AbstractWALRoller<RegionServerServices> {
encodedRegionName, r);
return;
}
- // force flushing all stores to clean old logs
- requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
+ // flush specified stores to clean old logs
+ requester.requestFlush(r, families, FlushLifeCycleTracker.DUMMY);
}
@VisibleForTesting
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 30ae092..1537f7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -285,7 +285,7 @@ class MemStoreFlusher implements FlushRequester {
server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
", Region memstore size=" +
TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
- flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
+ flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY);
if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush +
@@ -453,13 +453,18 @@ class MemStoreFlusher implements FlushRequester {
}
@Override
- public boolean requestFlush(HRegion r, boolean forceFlushAllStores,
- FlushLifeCycleTracker tracker) {
+ public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) {
+ return this.requestFlush(r, null, tracker);
+ }
+
+ @Override
+ public boolean requestFlush(HRegion r, List<byte[]> families,
+ FlushLifeCycleTracker tracker) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
- FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
+ FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
r.incrementFlushesQueuedCount();
@@ -472,12 +477,12 @@ class MemStoreFlusher implements FlushRequester {
}
@Override
- public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
+ public boolean requestDelayedFlush(HRegion r, long delay) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has some delay
FlushRegionEntry fqe =
- new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY);
+ new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
fqe.requeue(delay);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
@@ -576,7 +581,7 @@ class MemStoreFlusher implements FlushRequester {
return true;
}
}
- return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker());
+ return flushRegion(region, false, fqe.families, fqe.getTracker());
}
/**
@@ -586,13 +591,13 @@ class MemStoreFlusher implements FlushRequester {
* needs to be removed from the flush queue. If false, when we were called
* from the main flusher run loop and we got the entry to flush by calling
* poll on the flush queue (which removed it).
- * @param forceFlushAllStores whether we want to flush all store.
+ * @param families stores of region to flush.
* @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the region was
* not flushed.
*/
- private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
- FlushLifeCycleTracker tracker) {
+ private boolean flushRegion(HRegion region, boolean emergencyFlush,
+ List<byte[]> families, FlushLifeCycleTracker tracker) {
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available
@@ -607,7 +612,7 @@ class MemStoreFlusher implements FlushRequester {
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
- FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);
+ FlushResult flushResult = region.flushcache(families, false, tracker);
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit().isPresent();
@@ -840,15 +845,16 @@ class MemStoreFlusher implements FlushRequester {
private long whenToExpire;
private int requeueCount = 0;
- private final boolean forceFlushAllStores;
+ private final List<byte[]> families;
private final FlushLifeCycleTracker tracker;
- FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
+ FlushRegionEntry(final HRegion r, List<byte[]> families,
+ FlushLifeCycleTracker tracker) {
this.region = r;
this.createTime = EnvironmentEdgeManager.currentTime();
this.whenToExpire = this.createTime;
- this.forceFlushAllStores = forceFlushAllStores;
+ this.families = families;
this.tracker = tracker;
}
@@ -868,13 +874,6 @@ class MemStoreFlusher implements FlushRequester {
return this.requeueCount;
}
- /**
- * @return whether we need to flush all stores.
- */
- public boolean isForceFlushAllStores() {
- return forceFlushAllStores;
- }
-
public FlushLifeCycleTracker getTracker() {
return tracker;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index a978dbe..447eff4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -527,7 +528,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
@Override
- public byte[][] rollWriter() throws FailedLogCloseException, IOException {
+ public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
return rollWriter(false);
}
@@ -622,10 +623,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
* check the first (oldest) WAL, and return those regions which should be flushed so that
* it can be let-go/'archived'.
- * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
+ * @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file.
*/
- byte[][] findRegionsToForceFlush() throws IOException {
- byte[][] regions = null;
+ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
+ Map<byte[], List<byte[]>> regions = null;
int logCount = getNumRolledLogFiles();
if (logCount > this.maxLogs && logCount > 0) {
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
@@ -633,15 +634,20 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
}
if (regions != null) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < regions.length; i++) {
- if (i > 0) {
- sb.append(", ");
+ List<String> listForPrint = new ArrayList();
+ for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
+ StringBuilder families = new StringBuilder();
+ for (int i = 0; i < r.getValue().size(); i++) {
+ if (i > 0) {
+ families.append(",");
+ }
+ families.append(Bytes.toString(r.getValue().get(i)));
}
- sb.append(Bytes.toStringBinary(regions[i]));
+ listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
}
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
- "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
+ "; forcing (partial) flush of " + regions.size() + " region(s): " +
+ StringUtils.join(",", listForPrint));
}
return regions;
}
@@ -778,14 +784,14 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
@Override
- public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
+ public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
rollWriterLock.lock();
try {
// Return if nothing to flush.
if (!force && this.writer != null && this.numEntries.get() <= 0) {
return null;
}
- byte[][] regionsToFlush = null;
+ Map<byte[], List<byte[]>> regionsToFlush = null;
if (this.closed) {
LOG.debug("WAL closed. Skipping rolling of writer");
return regionsToFlush;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
index 9b5a4d7..ba66f5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
@@ -431,10 +432,10 @@ class SequenceIdAccounting {
* {@link #lowestUnflushedSequenceIds} has a sequence id less than that passed in
* <code>sequenceids</code> then return it.
* @param sequenceids Sequenceids keyed by encoded region name.
- * @return regions found in this instance with sequence ids less than those passed in.
+ * @return stores of regions found in this instance with sequence ids less than those passed in.
*/
- byte[][] findLower(Map<byte[], Long> sequenceids) {
- List<byte[]> toFlush = null;
+ Map<byte[], List<byte[]>> findLower(Map<byte[], Long> sequenceids) {
+ Map<byte[], List<byte[]>> toFlush = null;
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
synchronized (tieLock) {
for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
@@ -442,16 +443,17 @@ class SequenceIdAccounting {
if (m == null) {
continue;
}
- // The lowest sequence id outstanding for this region.
- long lowest = getLowestSequenceId(m);
- if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) {
- if (toFlush == null) {
- toFlush = new ArrayList<>();
+ for (Map.Entry<ImmutableByteArray, Long> me : m.entrySet()) {
+ if (me.getValue() <= e.getValue()) {
+ if (toFlush == null) {
+ toFlush = new TreeMap(Bytes.BYTES_COMPARATOR);
+ }
+ toFlush.computeIfAbsent(e.getKey(), k -> new ArrayList<>())
+ .add(Bytes.toBytes(me.getKey().toString()));
}
- toFlush.add(e.getKey());
}
}
}
- return toFlush == null ? null : toFlush.toArray(new byte[0][]);
+ return toFlush;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
index 3154c19..6999020 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
@@ -23,6 +23,7 @@ import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -44,8 +45,8 @@ import org.slf4j.LoggerFactory;
* NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
* there is something to do, rather than the Chore sleep time which is invariant.
* <p/>
- * The {@link #scheduleFlush(String)} is abstract here, as sometimes we hold a region without a
- * region server but we still want to roll its WAL.
+ * The {@link #scheduleFlush(String, List)} is abstract here,
+ * as sometimes we hold a region without a region server but we still want to roll its WAL.
* <p/>
* TODO: change to a pool of threads
*/
@@ -180,11 +181,12 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
// reset the flag in front to avoid missing roll request before we return from rollWriter.
walNeedsRoll.put(wal, Boolean.FALSE);
// Force the roll if the logroll.period is elapsed or if a roll was requested.
- // The returned value is an array of actual region names.
- byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
+ // The returned value is an collection of actual region and family names.
+ Map<byte[], List<byte[]>> regionsToFlush = wal.rollWriter(periodic ||
+ entry.getValue().booleanValue());
if (regionsToFlush != null) {
- for (byte[] r : regionsToFlush) {
- scheduleFlush(Bytes.toString(r));
+ for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
+ scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
}
}
afterRoll(wal);
@@ -211,8 +213,9 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
/**
* @param encodedRegionName Encoded name of region to flush.
+ * @param families stores of region to flush.
*/
- protected abstract void scheduleFlush(String encodedRegionName);
+ protected abstract void scheduleFlush(String encodedRegionName, List<byte[]> families);
private boolean isWaiting() {
Thread.State state = getState();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 98773c2..dbc08cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -115,7 +115,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public byte[][] rollWriter() {
+ public Map<byte[], List<byte[]>> rollWriter() {
if (!listeners.isEmpty()) {
for (WALActionsListener listener : listeners) {
listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
@@ -139,7 +139,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public byte[][] rollWriter(boolean force) {
+ public Map<byte[], List<byte[]>> rollWriter(boolean force) {
return rollWriter();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 7f7c412..26398c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HConstants;
@@ -61,11 +62,11 @@ public interface WAL extends Closeable, WALFileLengthProvider {
* The implementation is synchronized in order to make sure there's one rollWriter
* running at any given time.
*
- * @return If lots of logs, flush the returned regions so next time through we
+ * @return If lots of logs, flush the stores of returned regions so next time through we
* can clean logs. Returns null if nothing to flush. Names are actual
* region names as returned by {@link RegionInfo#getEncodedName()}
*/
- byte[][] rollWriter() throws FailedLogCloseException, IOException;
+ Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException;
/**
* Roll the log writer. That is, start writing log messages to a new file.
@@ -77,11 +78,11 @@ public interface WAL extends Closeable, WALFileLengthProvider {
* @param force
* If true, force creation of a new writer even if no entries have
* been written to the current writer
- * @return If lots of logs, flush the returned regions so next time through we
+ * @return If lots of logs, flush the stores of returned regions so next time through we
* can clean logs. Returns null if nothing to flush. Names are actual
* region names as returned by {@link RegionInfo#getEncodedName()}
*/
- byte[][] rollWriter(boolean force) throws IOException;
+ Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException;
/**
* Stop accepting new writes. If we have unsynced writes still in buffer, sync them.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 198e64b..d82915c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -121,8 +123,9 @@ public class TestFailedAppendAndSync {
}
@Override
- public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
- byte [][] regions = super.rollWriter(force);
+ public Map<byte[], List<byte[]>> rollWriter(boolean force)
+ throws FailedLogCloseException, IOException {
+ Map<byte[], List<byte[]>> regions = super.rollWriter(force);
rolls.getAndIncrement();
return regions;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
index e91ff12..d273501 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
@@ -66,8 +66,8 @@ public class TestFlushRegionEntry {
HRegion r = mock(HRegion.class);
doReturn(hri).when(r).getRegionInfo();
- FlushRegionEntry entry = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY);
- FlushRegionEntry other = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY);
+ FlushRegionEntry entry = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
+ FlushRegionEntry other = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
assertEquals(entry.hashCode(), other.hashCode());
assertEquals(entry, other);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
index 896942e..436d8d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Iterator;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService;
@@ -139,11 +141,11 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
Thread.sleep(1500);
// No changes should be made by tuner as we already have lot of empty space
@@ -182,10 +184,10 @@ public class TestHeapMemoryManager {
// do some offheap flushes also. So there should be decrease in memstore but
// not as that when we don't have offheap flushes
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@@ -230,10 +232,10 @@ public class TestHeapMemoryManager {
// do some offheap flushes also. So there should be decrease in memstore but
// not as that when we don't have offheap flushes
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@@ -246,10 +248,10 @@ public class TestHeapMemoryManager {
// flushes are due to onheap overhead. This should once again call for increase in
// memstore size but that increase should be to the safe size
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@@ -312,10 +314,10 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@@ -326,8 +328,8 @@ public class TestHeapMemoryManager {
oldBlockCacheSize = blockCache.maxSize;
// Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@@ -361,10 +363,10 @@ public class TestHeapMemoryManager {
heapMemoryManager.start(choreService);
// this should not change anything with onheap memstore
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
Thread.sleep(1500);
// No changes should be made by tuner as we already have lot of empty space
@@ -448,9 +450,9 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
blockCache.evictBlock(null);
// Allow the tuner to run once and do necessary memory up
Thread.sleep(1500);
@@ -459,9 +461,9 @@ public class TestHeapMemoryManager {
assertEquals(oldBlockCacheSize, blockCache.maxSize);
// Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@@ -494,9 +496,9 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
// Allow the tuner to run once and do necessary memory up
@@ -506,7 +508,7 @@ public class TestHeapMemoryManager {
assertEquals(oldBlockCacheSize, blockCache.maxSize);
// Flushes that block updates
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
- memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+ memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
@@ -752,14 +754,19 @@ public class TestHeapMemoryManager {
}
@Override
- public boolean requestFlush(HRegion region, boolean forceFlushAllStores,
- FlushLifeCycleTracker tracker) {
+ public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
this.listener.flushRequested(flushType, region);
return true;
}
@Override
- public boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) {
+ public boolean requestFlush(HRegion region, List<byte[]> families,
+ FlushLifeCycleTracker tracker) {
+ return true;
+ }
+
+ @Override
+ public boolean requestDelayedFlush(HRegion region, long delay) {
return true;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
index 0de5cb0..9cd0ec2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
@@ -141,7 +141,7 @@ public class TestSplitWalDataLoss {
long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
- rs.getMemStoreFlusher().requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY);
+ rs.getMemStoreFlusher().requestFlush(spiedRegion, FlushLifeCycleTracker.DUMMY);
synchronized (flushed) {
while (!flushed.booleanValue()) {
flushed.wait();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
index 2dd5f87..6262aea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -169,9 +171,9 @@ public abstract class AbstractTestFSWAL {
}
protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
- MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
+ MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes, String cf)
throws IOException {
- final byte[] row = Bytes.toBytes("row");
+ final byte[] row = Bytes.toBytes(cf);
for (int i = 0; i < times; i++) {
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
@@ -253,8 +255,8 @@ public abstract class AbstractTestFSWAL {
* regions which should be flushed in order to archive the oldest wal file.
* <p>
* This method tests this behavior by inserting edits and rolling the wal enough times to reach
- * the max number of logs threshold. It checks whether we get the "right regions" for flush on
- * rolling the wal.
+ * the max number of logs threshold. It checks whether we get the "right regions and stores" for
+ * flush on rolling the wal.
* @throws Exception
*/
@Test
@@ -264,12 +266,23 @@ public abstract class AbstractTestFSWAL {
conf1.setInt("hbase.regionserver.maxlogs", 1);
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
+ String cf1 = "cf1";
+ String cf2 = "cf2";
+ String cf3 = "cf3";
TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1"))
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build();
TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2"))
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build();
RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
+
+ List<ColumnFamilyDescriptor> cfs = new ArrayList();
+ cfs.add(ColumnFamilyDescriptorBuilder.of(cf1));
+ cfs.add(ColumnFamilyDescriptorBuilder.of(cf2));
+ TableDescriptor t3 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t3"))
+ .setColumnFamilies(cfs).build();
+ RegionInfo hri3 = RegionInfoBuilder.newBuilder(t3.getTableName()).build();
+
// add edits and roll the wal
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -280,26 +293,30 @@ public abstract class AbstractTestFSWAL {
for (byte[] fam : t2.getColumnFamilyNames()) {
scopes2.put(fam, 0);
}
+ NavigableMap<byte[], Integer> scopes3 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (byte[] fam : t3.getColumnFamilyNames()) {
+ scopes3.put(fam, 0);
+ }
try {
- addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+ addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
wal.rollWriter();
// add some more edits and roll the wal. This would reach the log number threshold
- addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+ addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
wal.rollWriter();
// with above rollWriter call, the max logs limit is reached.
assertTrue(wal.getNumRolledLogFiles() == 2);
// get the regions to flush; since there is only one region in the oldest wal, it should
// return only one region.
- byte[][] regionsToFlush = wal.findRegionsToForceFlush();
- assertEquals(1, regionsToFlush.length);
- assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+ Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush();
+ assertEquals(1, regionsToFlush.size());
+ assertEquals(hri1.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]);
// insert edits in second region
- addEdits(wal, hri2, t2, 2, mvcc, scopes2);
+ addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
// get the regions to flush, it should still read region1.
regionsToFlush = wal.findRegionsToForceFlush();
- assertEquals(1, regionsToFlush.length);
- assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+ assertEquals(1, regionsToFlush.size());
+ assertEquals(hri1.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]);
// flush region 1, and roll the wal file. Only last wal which has entries for region1 should
// remain.
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
@@ -312,29 +329,50 @@ public abstract class AbstractTestFSWAL {
// no wal should remain now.
assertEquals(0, wal.getNumRolledLogFiles());
// add edits both to region 1 and region 2, and roll.
- addEdits(wal, hri1, t1, 2, mvcc, scopes1);
- addEdits(wal, hri2, t2, 2, mvcc, scopes2);
+ addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
+ addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
wal.rollWriter();
// add edits and roll the writer, to reach the max logs limit.
assertEquals(1, wal.getNumRolledLogFiles());
- addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+ addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
wal.rollWriter();
// it should return two regions to flush, as the oldest wal file has entries
// for both regions.
regionsToFlush = wal.findRegionsToForceFlush();
- assertEquals(2, regionsToFlush.length);
+ assertEquals(2, regionsToFlush.size());
// flush both regions
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
wal.rollWriter(true);
assertEquals(0, wal.getNumRolledLogFiles());
// Add an edit to region1, and roll the wal.
- addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+ addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
wal.rollWriter();
wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
assertEquals(1, wal.getNumRolledLogFiles());
+
+ // clear test data
+ flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
+ wal.rollWriter(true);
+ // add edits for three familes
+ addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
+ addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2);
+ addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3);
+ wal.rollWriter();
+ addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
+ wal.rollWriter();
+ assertEquals(2, wal.getNumRolledLogFiles());
+ // flush one family before archive oldest wal
+ Set<byte[]> flushedFamilyNames = new HashSet<>();
+ flushedFamilyNames.add(Bytes.toBytes(cf1));
+ flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames);
+ regionsToFlush = wal.findRegionsToForceFlush();
+ // then only two family need to be flushed when archive oldest wal
+ assertEquals(1, regionsToFlush.size());
+ assertEquals(hri3.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]);
+ assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size());
} finally {
if (wal != null) {
wal.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index dbbe228..a874aba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -1109,9 +1109,9 @@ public abstract class AbstractTestWALReplay {
private HRegion r;
@Override
- public boolean requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) {
+ public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
try {
- r.flush(force);
+ r.flush(false);
return true;
} catch (IOException e) {
throw new RuntimeException("Exception flushing", e);
@@ -1119,7 +1119,13 @@ public abstract class AbstractTestWALReplay {
}
@Override
- public boolean requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) {
+ public boolean requestFlush(HRegion region, List<byte[]> families,
+ FlushLifeCycleTracker tracker) {
+ return true;
+ }
+
+ @Override
+ public boolean requestDelayedFlush(HRegion region, long when) {
return true;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 5da1bfb..cbb740e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -124,7 +124,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < 10; i++) {
- addEdits(log, hri, htd, 1, mvcc, scopes);
+ addEdits(log, hri, htd, 1, mvcc, scopes, "row");
}
} finally {
log.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
index 754aedb..f7ada79 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
@@ -131,7 +131,7 @@ public class TestSequenceIdAccounting {
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
assertTrue(sida.findLower(m) == null);
m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME));
- assertTrue(sida.findLower(m).length == 1);
+ assertTrue(sida.findLower(m).size() == 1);
m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1);
assertTrue(sida.findLower(m) == null);
}