You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/08/10 03:04:30 UTC
[hbase] branch branch-2 updated: HBASE-23157 WAL unflushed seqId
tracking may wrong when Durability.ASYNC_WAL is used (#762)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new ef7b9eb HBASE-23157 WAL unflushed seqId tracking may wrong when Durability.ASYNC_WAL is used (#762)
ef7b9eb is described below
commit ef7b9eb36eed4bcd7c0408e86003cd355e508e3e
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Aug 10 09:11:31 2020 +0800
HBASE-23157 WAL unflushed seqId tracking may wrong when Durability.ASYNC_WAL is used (#762)
Signed-off-by: stack <st...@apache.org>
---
.../hadoop/hbase/util/ImmutableByteArray.java | 1 +
.../apache/hadoop/hbase/regionserver/HRegion.java | 2 +-
.../hbase/regionserver/wal/AbstractFSWAL.java | 4 +-
.../regionserver/wal/SequenceIdAccounting.java | 29 ++-
.../hadoop/hbase/wal/DisabledWALProvider.java | 2 +-
.../main/java/org/apache/hadoop/hbase/wal/WAL.java | 6 +-
.../hbase/regionserver/wal/AbstractTestFSWAL.java | 232 ++++++++++++++-------
.../regionserver/wal/AbstractTestWALReplay.java | 8 +-
.../regionserver/wal/TestSequenceIdAccounting.java | 10 +-
.../hadoop/hbase/wal/TestFSHLogProvider.java | 7 +-
.../apache/hadoop/hbase/wal/TestWALFactory.java | 4 +-
11 files changed, 206 insertions(+), 99 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java
index 1232b9c..57f7cbc 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java
@@ -48,6 +48,7 @@ public final class ImmutableByteArray {
return new ImmutableByteArray(b);
}
+ @Override
public String toString() {
return Bytes.toStringBinary(b);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 08aa8fb..f62da4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2953,7 +2953,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If we get to here, the HStores have been written.
if (wal != null) {
- wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
+ wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), flushedSeqId);
}
// Record latest flush time
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 447eff4..3b688d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -499,8 +499,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
@Override
- public void completeCacheFlush(byte[] encodedRegionName) {
- this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
+ public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
+ this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
index ba66f5b..27a4c8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -351,9 +351,36 @@ class SequenceIdAccounting {
return lowestUnflushedInRegion;
}
- void completeCacheFlush(final byte[] encodedRegionName) {
+ void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
+ // This is a simple hack to avoid maxFlushedSeqId go backwards.
+ // The system works fine normally, but if we make use of Durability.ASYNC_WAL and we are going
+ // to flush all the stores, the maxFlushedSeqId will be next seq id of the region, but we may
+ // still have some unsynced WAL entries in the ringbuffer after we call startCacheFlush, and
+ // then it will be recorded as the lowestUnflushedSeqId by the above update method, which is
+ // less than the current maxFlushedSeqId. And if next time we only flush the family with this
+ // unusual lowestUnflushedSeqId, the maxFlushedSeqId will go backwards.
+ // This is an unexpected behavior so we should fix it, otherwise it may cause unexpected
+ // behavior in other area.
+ // The solution here is a bit hack but fine. Just replace the lowestUnflushedSeqId with
+ // maxFlushedSeqId + 1 if it is lesser. The meaning of maxFlushedSeqId is that, all edits less
+ // than or equal to it have been flushed, i.e, persistent to HFile, so set
+ // lowestUnflushedSequenceId to maxFlushedSeqId + 1 will not cause data loss.
+ // And technically, using +1 is fine here. If the maxFlushesSeqId is just the flushOpSeqId, it
+ // means we have flushed all the stores so the seq id for actual data should be at least plus 1.
+ // And if we do not flush all the stores, then the maxFlushedSeqId is calculated by
+ // lowestUnflushedSeqId - 1, so here let's plus the 1 back.
+ Long wrappedSeqId = Long.valueOf(maxFlushedSeqId + 1);
synchronized (tieLock) {
this.flushingSequenceIds.remove(encodedRegionName);
+ Map<ImmutableByteArray, Long> unflushed = lowestUnflushedSequenceIds.get(encodedRegionName);
+ if (unflushed == null) {
+ return;
+ }
+ for (Map.Entry<ImmutableByteArray, Long> e : unflushed.entrySet()) {
+ if (e.getValue().longValue() <= maxFlushedSeqId) {
+ e.setValue(wrappedSeqId);
+ }
+ }
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index dbc08cc..0ff2195 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -224,7 +224,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public void completeCacheFlush(final byte[] encodedRegionName) {
+ public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) {
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 26398c1..61d5eb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -185,7 +185,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
* being flushed; in other words, this is effectively same as a flush of all of the region
* though we were passed a subset of regions. Otherwise, it returns the sequence id of the
* oldest/lowest outstanding edit.
- * @see #completeCacheFlush(byte[])
+ * @see #completeCacheFlush(byte[], long)
* @see #abortCacheFlush(byte[])
*/
Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
@@ -195,10 +195,12 @@ public interface WAL extends Closeable, WALFileLengthProvider {
/**
* Complete the cache flush.
* @param encodedRegionName Encoded region name.
+ * @param maxFlushedSeqId The maxFlushedSeqId for this flush. There is no edit in memory that is
+ * less that this sequence id.
* @see #startCacheFlush(byte[], Set)
* @see #abortCacheFlush(byte[])
*/
- void completeCacheFlush(final byte[] encodedRegionName);
+ void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId);
/**
* Abort a cache flush. Call if the flush fails. Note that the only recovery
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
index 6262aea..1194a10 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
@@ -30,6 +30,8 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@@ -38,10 +40,12 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -67,7 +71,10 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.FlushPolicy;
+import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -82,7 +89,6 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
-
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -188,12 +194,10 @@ public abstract class AbstractTestFSWAL {
/**
* helper method to simulate region flush for a WAL.
- * @param wal
- * @param regionEncodedName
*/
protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
- wal.completeCacheFlush(regionEncodedName);
+ wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
}
/**
@@ -350,7 +354,7 @@ public abstract class AbstractTestFSWAL {
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
wal.rollWriter();
- wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
+ wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
assertEquals(1, wal.getNumRolledLogFiles());
// clear test data
@@ -526,93 +530,165 @@ public abstract class AbstractTestFSWAL {
}
}
- @Test
- public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
- final String testName = currentTest.getMethodName();
- final byte[] b = Bytes.toBytes("b");
-
- final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
- final CountDownLatch holdAppend = new CountDownLatch(1);
- final CountDownLatch closeFinished = new CountDownLatch(1);
- final CountDownLatch putFinished = new CountDownLatch(1);
-
- try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
- HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
- wal.init();
- wal.registerWALActionsListener(new WALActionsListener() {
- @Override
- public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
- if (startHoldingForAppend.get()) {
- try {
- holdAppend.await();
- } catch (InterruptedException e) {
- LOG.error(e.toString(), e);
- }
- }
- }
- });
-
- // open a new region which uses this WAL
- TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
- RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
- ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
- TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
- RegionServerServices rsServices = mock(RegionServerServices.class);
- when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
- when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
- final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal,
- TEST_UTIL.getConfiguration(), rsServices, null);
-
- ExecutorService exec = Executors.newFixedThreadPool(2);
-
- // do a regular write first because of memstore size calculation.
- region.put(new Put(b).addColumn(b, b, b));
-
- startHoldingForAppend.set(true);
- exec.submit(new Runnable() {
- @Override
- public void run() {
+ private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend,
+ CountDownLatch holdAppend) throws IOException {
+ AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
+ HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
+ wal.init();
+ wal.registerWALActionsListener(new WALActionsListener() {
+ @Override
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
+ if (startHoldingForAppend.get()) {
try {
- region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL));
- putFinished.countDown();
- } catch (IOException e) {
+ holdAppend.await();
+ } catch (InterruptedException e) {
LOG.error(e.toString(), e);
}
}
- });
+ }
+ });
+ return wal;
+ }
- // give the put a chance to start
- Threads.sleep(3000);
+ private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal)
+ throws IOException {
+ RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
+ TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
+ RegionServerServices rsServices = mock(RegionServerServices.class);
+ when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
+ when(rsServices.getConfiguration()).thenReturn(conf);
+ return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null);
+ }
- exec.submit(new Runnable() {
- @Override
- public void run() {
- try {
- Map<?, ?> closeResult = region.close();
- LOG.info("Close result:" + closeResult);
- closeFinished.countDown();
- } catch (IOException e) {
- LOG.error(e.toString(), e);
- }
- }
- });
+ private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put,
+ Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend,
+ CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend)
+ throws InterruptedException, IOException {
+ // do a regular write first because of memstore size calculation.
+ region.put(put);
- // give the flush a chance to start. Flush should have got the region lock, and
- // should have been waiting on the mvcc complete after this.
- Threads.sleep(3000);
+ startHoldingForAppend.set(true);
+ region.put(new Put(put).setDurability(Durability.ASYNC_WAL));
+
+ // give the put a chance to start
+ Threads.sleep(3000);
+
+ exec.submit(flushOrCloseRegion);
+
+ // give the flush a chance to start. Flush should have got the region lock, and
+ // should have been waiting on the mvcc complete after this.
+ Threads.sleep(3000);
+
+ // let the append to WAL go through now that the flush already started
+ holdAppend.countDown();
+ flushOrCloseFinished.await();
+ }
- // let the append to WAL go through now that the flush already started
- holdAppend.countDown();
- putFinished.await();
- closeFinished.await();
+ // Testcase for HBASE-23181
+ @Test
+ public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
+ String testName = currentTest.getMethodName();
+ byte[] b = Bytes.toBytes("b");
+ TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
+
+ AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
+ CountDownLatch holdAppend = new CountDownLatch(1);
+ CountDownLatch closeFinished = new CountDownLatch(1);
+ ExecutorService exec = Executors.newFixedThreadPool(1);
+ AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
+ // open a new region which uses this WAL
+ HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal);
+ try {
+ doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> {
+ try {
+ Map<?, ?> closeResult = region.close();
+ LOG.info("Close result:" + closeResult);
+ closeFinished.countDown();
+ } catch (IOException e) {
+ LOG.error(e.toString(), e);
+ }
+ }, startHoldingForAppend, closeFinished, holdAppend);
// now check the region's unflushed seqIds.
- long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
+ long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
seqId);
+ } finally {
+ exec.shutdownNow();
+ region.close();
+ wal.close();
+ }
+ }
+
+ private static final Set<byte[]> STORES_TO_FLUSH =
+ Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR));
+ // Testcase for HBASE-23157
+ @Test
+ public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException {
+ String testName = currentTest.getMethodName();
+ byte[] a = Bytes.toBytes("a");
+ byte[] b = Bytes.toBytes("b");
+ TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(a))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
+
+ AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
+ CountDownLatch holdAppend = new CountDownLatch(1);
+ CountDownLatch flushFinished = new CountDownLatch(1);
+ ExecutorService exec = Executors.newFixedThreadPool(2);
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class,
+ FlushPolicy.class);
+ AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
+ // open a new region which uses this WAL
+ HRegion region = createHoldingHRegion(conf, htd, wal);
+ try {
+ Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b);
+ doPutWithAsyncWAL(exec, region, put, () -> {
+ try {
+ HRegion.FlushResult flushResult = region.flush(true);
+ LOG.info("Flush result:" + flushResult.getResult());
+ LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
+ flushFinished.countDown();
+ } catch (IOException e) {
+ LOG.error(e.toString(), e);
+ }
+ }, startHoldingForAppend, flushFinished, holdAppend);
+
+ // get the max flushed sequence id after the first flush
+ long maxFlushedSeqId1 = region.getMaxFlushedSeqId();
+
+ region.put(put);
+ // this time we only flush family a
+ STORES_TO_FLUSH.add(a);
+ region.flush(false);
+
+ // get the max flushed sequence id after the second flush
+ long maxFlushedSeqId2 = region.getMaxFlushedSeqId();
+ // make sure that the maxFlushedSequenceId does not go backwards
+ assertTrue(
+ "maxFlushedSeqId1(" + maxFlushedSeqId1 +
+ ") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")",
+ maxFlushedSeqId1 <= maxFlushedSeqId2);
+ } finally {
+ exec.shutdownNow();
+ region.close();
wal.close();
}
}
+
+ public static final class FlushSpecificStoresPolicy extends FlushPolicy {
+
+ @Override
+ public Collection<HStore> selectStoresToFlush() {
+ if (STORES_TO_FLUSH.isEmpty()) {
+ return region.getStores();
+ } else {
+ return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList());
+ }
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index a874aba..9c069bd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -794,7 +794,7 @@ public abstract class AbstractTestWALReplay {
// Add a cache flush, shouldn't have any effect
wal.startCacheFlush(regionName, familyNames);
- wal.completeCacheFlush(regionName);
+ wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
// Add an edit to another family, should be skipped.
WALEdit edit = new WALEdit();
@@ -896,7 +896,7 @@ public abstract class AbstractTestWALReplay {
wal.doCompleteCacheFlush = true;
// allow complete cache flush with the previous seq number got after first
// set of edits.
- wal.completeCacheFlush(hri.getEncodedNameAsBytes());
+ wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
wal.shutdown();
FileStatus[] listStatus = wal.getFiles();
assertNotNull(listStatus);
@@ -1079,11 +1079,11 @@ public abstract class AbstractTestWALReplay {
}
@Override
- public void completeCacheFlush(byte[] encodedRegionName) {
+ public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
if (!doCompleteCacheFlush) {
return;
}
- super.completeCacheFlush(encodedRegionName);
+ super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
index f7ada79..ad52bd3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
@@ -55,12 +55,12 @@ public class TestSequenceIdAccounting {
Map<byte[], Long> m = new HashMap<>();
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
- sida.completeCacheFlush(ENCODED_REGION_NAME);
+ sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
long sequenceid = 1;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
// Only one family so should return NO_SEQNUM still.
assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
- sida.completeCacheFlush(ENCODED_REGION_NAME);
+ sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
long currentSequenceId = sequenceid;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
final Set<byte[]> otherFamily = new HashSet<>(1);
@@ -68,7 +68,7 @@ public class TestSequenceIdAccounting {
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
// Should return oldest sequence id in the region.
assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily));
- sida.completeCacheFlush(ENCODED_REGION_NAME);
+ sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
}
@Test
@@ -95,7 +95,7 @@ public class TestSequenceIdAccounting {
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
assertTrue(sida.areAllLower(m));
// Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits
- sida.completeCacheFlush(ENCODED_REGION_NAME);
+ sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
m.put(ENCODED_REGION_NAME, sequenceid);
assertTrue(sida.areAllLower(m));
// Flush again but add sequenceids while we are flushing.
@@ -108,7 +108,7 @@ public class TestSequenceIdAccounting {
sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
// The cache flush will clear out all sequenceid accounting by region.
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
- sida.completeCacheFlush(ENCODED_REGION_NAME);
+ sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
// No new edits have gone in so no sequenceid to work with.
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
// Make an edit behind all we'll put now into sida.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java
index 82eefb2..72e4998 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -178,7 +179,7 @@ public class TestFSHLogProvider {
*/
protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
- wal.completeCacheFlush(regionEncodedName);
+ wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
}
@Test
@@ -230,7 +231,7 @@ public class TestFSHLogProvider {
// archived. We need to append something or writer won't be rolled.
addEdits(log, hri2, htd2, 1, scopes2);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
- log.completeCacheFlush(hri.getEncodedNameAsBytes());
+ log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
log.rollWriter();
int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
assertEquals(2, count);
@@ -240,7 +241,7 @@ public class TestFSHLogProvider {
// flush information
addEdits(log, hri2, htd2, 1, scopes2);
log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames());
- log.completeCacheFlush(hri2.getEncodedNameAsBytes());
+ log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
log.rollWriter();
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log));
} finally {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 81db235..74ab840 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -528,7 +528,7 @@ public class TestWALFactory {
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
log.sync(txid);
log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
- log.completeCacheFlush(info.getEncodedNameAsBytes());
+ log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
log.shutdown();
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
// Now open a reader on the log and assert append worked.
@@ -584,7 +584,7 @@ public class TestWALFactory {
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
log.sync(txid);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
- log.completeCacheFlush(hri.getEncodedNameAsBytes());
+ log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
log.shutdown();
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
// Now open a reader on the log and assert append worked.