You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/08/10 03:04:30 UTC

[hbase] branch branch-2 updated: HBASE-23157 WAL unflushed seqId tracking may wrong when Durability.ASYNC_WAL is used (#762)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new ef7b9eb  HBASE-23157 WAL unflushed seqId tracking may wrong when Durability.ASYNC_WAL is used (#762)
ef7b9eb is described below

commit ef7b9eb36eed4bcd7c0408e86003cd355e508e3e
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Aug 10 09:11:31 2020 +0800

    HBASE-23157 WAL unflushed seqId tracking may wrong when Durability.ASYNC_WAL is used (#762)
    
    Signed-off-by: stack <st...@apache.org>
---
 .../hadoop/hbase/util/ImmutableByteArray.java      |   1 +
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   2 +-
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   4 +-
 .../regionserver/wal/SequenceIdAccounting.java     |  29 ++-
 .../hadoop/hbase/wal/DisabledWALProvider.java      |   2 +-
 .../main/java/org/apache/hadoop/hbase/wal/WAL.java |   6 +-
 .../hbase/regionserver/wal/AbstractTestFSWAL.java  | 232 ++++++++++++++-------
 .../regionserver/wal/AbstractTestWALReplay.java    |   8 +-
 .../regionserver/wal/TestSequenceIdAccounting.java |  10 +-
 .../hadoop/hbase/wal/TestFSHLogProvider.java       |   7 +-
 .../apache/hadoop/hbase/wal/TestWALFactory.java    |   4 +-
 11 files changed, 206 insertions(+), 99 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java
index 1232b9c..57f7cbc 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java
@@ -48,6 +48,7 @@ public final class ImmutableByteArray {
     return new ImmutableByteArray(b);
   }
 
+  @Override
   public String toString() {
     return Bytes.toStringBinary(b);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 08aa8fb..f62da4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2953,7 +2953,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     // If we get to here, the HStores have been written.
     if (wal != null) {
-      wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
+      wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), flushedSeqId);
     }
 
     // Record latest flush time
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 447eff4..3b688d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -499,8 +499,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
   }
 
   @Override
-  public void completeCacheFlush(byte[] encodedRegionName) {
-    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
+  public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
+    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
index ba66f5b..27a4c8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -351,9 +351,36 @@ class SequenceIdAccounting {
     return lowestUnflushedInRegion;
   }
 
-  void completeCacheFlush(final byte[] encodedRegionName) {
+  void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
+    // This is a simple hack to avoid maxFlushedSeqId go backwards.
+    // The system works fine normally, but if we make use of Durability.ASYNC_WAL and we are going
+    // to flush all the stores, the maxFlushedSeqId will be next seq id of the region, but we may
+    // still have some unsynced WAL entries in the ringbuffer after we call startCacheFlush, and
+    // then it will be recorded as the lowestUnflushedSeqId by the above update method, which is
+    // less than the current maxFlushedSeqId. And if next time we only flush the family with this
+    // unusual lowestUnflushedSeqId, the maxFlushedSeqId will go backwards.
+    // This is an unexpected behavior so we should fix it, otherwise it may cause unexpected
+    // behavior in other area.
+    // The solution here is a bit hack but fine. Just replace the lowestUnflushedSeqId with
+    // maxFlushedSeqId + 1 if it is lesser. The meaning of maxFlushedSeqId is that, all edits less
+    // than or equal to it have been flushed, i.e, persistent to HFile, so set
+    // lowestUnflushedSequenceId to maxFlushedSeqId + 1 will not cause data loss.
+    // And technically, using +1 is fine here. If the maxFlushesSeqId is just the flushOpSeqId, it
+    // means we have flushed all the stores so the seq id for actual data should be at least plus 1.
+    // And if we do not flush all the stores, then the maxFlushedSeqId is calculated by
+    // lowestUnflushedSeqId - 1, so here let's plus the 1 back.
+    Long wrappedSeqId = Long.valueOf(maxFlushedSeqId + 1);
     synchronized (tieLock) {
       this.flushingSequenceIds.remove(encodedRegionName);
+      Map<ImmutableByteArray, Long> unflushed = lowestUnflushedSequenceIds.get(encodedRegionName);
+      if (unflushed == null) {
+        return;
+      }
+      for (Map.Entry<ImmutableByteArray, Long> e : unflushed.entrySet()) {
+        if (e.getValue().longValue() <= maxFlushedSeqId) {
+          e.setValue(wrappedSeqId);
+        }
+      }
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index dbc08cc..0ff2195 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -224,7 +224,7 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public void completeCacheFlush(final byte[] encodedRegionName) {
+    public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) {
     }
 
     @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 26398c1..61d5eb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -185,7 +185,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
    * being flushed; in other words, this is effectively same as a flush of all of the region
    * though we were passed a subset of regions. Otherwise, it returns the sequence id of the
    * oldest/lowest outstanding edit.
-   * @see #completeCacheFlush(byte[])
+   * @see #completeCacheFlush(byte[], long)
    * @see #abortCacheFlush(byte[])
    */
   Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
@@ -195,10 +195,12 @@ public interface WAL extends Closeable, WALFileLengthProvider {
   /**
    * Complete the cache flush.
    * @param encodedRegionName Encoded region name.
+   * @param maxFlushedSeqId The maxFlushedSeqId for this flush. There is no edit in memory that is
+   *          less that this sequence id.
    * @see #startCacheFlush(byte[], Set)
    * @see #abortCacheFlush(byte[])
    */
-  void completeCacheFlush(final byte[] encodedRegionName);
+  void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId);
 
   /**
    * Abort a cache flush. Call if the flush fails. Note that the only recovery
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
index 6262aea..1194a10 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
@@ -30,6 +30,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
@@ -38,10 +40,12 @@ import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -67,7 +71,10 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
 import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.FlushPolicy;
+import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -82,7 +89,6 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
-
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -188,12 +194,10 @@ public abstract class AbstractTestFSWAL {
 
   /**
    * helper method to simulate region flush for a WAL.
-   * @param wal
-   * @param regionEncodedName
    */
   protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
     wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
-    wal.completeCacheFlush(regionEncodedName);
+    wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
   }
 
   /**
@@ -350,7 +354,7 @@ public abstract class AbstractTestFSWAL {
       // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
       wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
       wal.rollWriter();
-      wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
+      wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
       assertEquals(1, wal.getNumRolledLogFiles());
 
       // clear test data
@@ -526,93 +530,165 @@ public abstract class AbstractTestFSWAL {
     }
   }
 
-  @Test
-  public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
-    final String testName = currentTest.getMethodName();
-    final byte[] b = Bytes.toBytes("b");
-
-    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
-    final CountDownLatch holdAppend = new CountDownLatch(1);
-    final CountDownLatch closeFinished = new CountDownLatch(1);
-    final CountDownLatch putFinished = new CountDownLatch(1);
-
-    try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
-      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
-      wal.init();
-      wal.registerWALActionsListener(new WALActionsListener() {
-        @Override
-        public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
-          if (startHoldingForAppend.get()) {
-            try {
-              holdAppend.await();
-            } catch (InterruptedException e) {
-              LOG.error(e.toString(), e);
-            }
-          }
-        }
-      });
-
-      // open a new region which uses this WAL
-      TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
-        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
-      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
-      ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
-      TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
-      RegionServerServices rsServices = mock(RegionServerServices.class);
-      when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
-      when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
-      final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal,
-        TEST_UTIL.getConfiguration(), rsServices, null);
-
-      ExecutorService exec = Executors.newFixedThreadPool(2);
-
-      // do a regular write first because of memstore size calculation.
-      region.put(new Put(b).addColumn(b, b, b));
-
-      startHoldingForAppend.set(true);
-      exec.submit(new Runnable() {
-        @Override
-        public void run() {
+  private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend,
+    CountDownLatch holdAppend) throws IOException {
+    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
+      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
+    wal.init();
+    wal.registerWALActionsListener(new WALActionsListener() {
+      @Override
+      public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
+        if (startHoldingForAppend.get()) {
           try {
-            region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL));
-            putFinished.countDown();
-          } catch (IOException e) {
+            holdAppend.await();
+          } catch (InterruptedException e) {
             LOG.error(e.toString(), e);
           }
         }
-      });
+      }
+    });
+    return wal;
+  }
 
-      // give the put a chance to start
-      Threads.sleep(3000);
+  private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal)
+    throws IOException {
+    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
+    TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
+    RegionServerServices rsServices = mock(RegionServerServices.class);
+    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
+    when(rsServices.getConfiguration()).thenReturn(conf);
+    return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null);
+  }
 
-      exec.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            Map<?, ?> closeResult = region.close();
-            LOG.info("Close result:" + closeResult);
-            closeFinished.countDown();
-          } catch (IOException e) {
-            LOG.error(e.toString(), e);
-          }
-        }
-      });
+  private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put,
+    Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend,
+    CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend)
+    throws InterruptedException, IOException {
+    // do a regular write first because of memstore size calculation.
+    region.put(put);
 
-      // give the flush a chance to start. Flush should have got the region lock, and
-      // should have been waiting on the mvcc complete after this.
-      Threads.sleep(3000);
+    startHoldingForAppend.set(true);
+    region.put(new Put(put).setDurability(Durability.ASYNC_WAL));
+
+    // give the put a chance to start
+    Threads.sleep(3000);
+
+    exec.submit(flushOrCloseRegion);
+
+    // give the flush a chance to start. Flush should have got the region lock, and
+    // should have been waiting on the mvcc complete after this.
+    Threads.sleep(3000);
+
+    // let the append to WAL go through now that the flush already started
+    holdAppend.countDown();
+    flushOrCloseFinished.await();
+  }
 
-      // let the append to WAL go through now that the flush already started
-      holdAppend.countDown();
-      putFinished.await();
-      closeFinished.await();
+  // Testcase for HBASE-23181
+  @Test
+  public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
+    String testName = currentTest.getMethodName();
+    byte[] b = Bytes.toBytes("b");
+    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
+
+    AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
+    CountDownLatch holdAppend = new CountDownLatch(1);
+    CountDownLatch closeFinished = new CountDownLatch(1);
+    ExecutorService exec = Executors.newFixedThreadPool(1);
+    AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
+    // open a new region which uses this WAL
+    HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal);
+    try {
+      doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> {
+        try {
+          Map<?, ?> closeResult = region.close();
+          LOG.info("Close result:" + closeResult);
+          closeFinished.countDown();
+        } catch (IOException e) {
+          LOG.error(e.toString(), e);
+        }
+      }, startHoldingForAppend, closeFinished, holdAppend);
 
       // now check the region's unflushed seqIds.
-      long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
+      long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
       assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
         seqId);
+    } finally {
+      exec.shutdownNow();
+      region.close();
+      wal.close();
+    }
+  }
+
+  private static final Set<byte[]> STORES_TO_FLUSH =
+    Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR));
 
+  // Testcase for HBASE-23157
+  @Test
+  public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException {
+    String testName = currentTest.getMethodName();
+    byte[] a = Bytes.toBytes("a");
+    byte[] b = Bytes.toBytes("b");
+    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(a))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
+
+    AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
+    CountDownLatch holdAppend = new CountDownLatch(1);
+    CountDownLatch flushFinished = new CountDownLatch(1);
+    ExecutorService exec = Executors.newFixedThreadPool(2);
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class,
+      FlushPolicy.class);
+    AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
+    // open a new region which uses this WAL
+    HRegion region = createHoldingHRegion(conf, htd, wal);
+    try {
+      Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b);
+      doPutWithAsyncWAL(exec, region, put, () -> {
+        try {
+          HRegion.FlushResult flushResult = region.flush(true);
+          LOG.info("Flush result:" + flushResult.getResult());
+          LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
+          flushFinished.countDown();
+        } catch (IOException e) {
+          LOG.error(e.toString(), e);
+        }
+      }, startHoldingForAppend, flushFinished, holdAppend);
+
+      // get the max flushed sequence id after the first flush
+      long maxFlushedSeqId1 = region.getMaxFlushedSeqId();
+
+      region.put(put);
+      // this time we only flush family a
+      STORES_TO_FLUSH.add(a);
+      region.flush(false);
+
+      // get the max flushed sequence id after the second flush
+      long maxFlushedSeqId2 = region.getMaxFlushedSeqId();
+      // make sure that the maxFlushedSequenceId does not go backwards
+      assertTrue(
+        "maxFlushedSeqId1(" + maxFlushedSeqId1 +
+          ") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")",
+        maxFlushedSeqId1 <= maxFlushedSeqId2);
+    } finally {
+      exec.shutdownNow();
+      region.close();
       wal.close();
     }
   }
+
+  public static final class FlushSpecificStoresPolicy extends FlushPolicy {
+
+    @Override
+    public Collection<HStore> selectStoresToFlush() {
+      if (STORES_TO_FLUSH.isEmpty()) {
+        return region.getStores();
+      } else {
+        return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList());
+      }
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index a874aba..9c069bd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -794,7 +794,7 @@ public abstract class AbstractTestWALReplay {
 
     // Add a cache flush, shouldn't have any effect
     wal.startCacheFlush(regionName, familyNames);
-    wal.completeCacheFlush(regionName);
+    wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
 
     // Add an edit to another family, should be skipped.
     WALEdit edit = new WALEdit();
@@ -896,7 +896,7 @@ public abstract class AbstractTestWALReplay {
     wal.doCompleteCacheFlush = true;
     // allow complete cache flush with the previous seq number got after first
     // set of edits.
-    wal.completeCacheFlush(hri.getEncodedNameAsBytes());
+    wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
     wal.shutdown();
     FileStatus[] listStatus = wal.getFiles();
     assertNotNull(listStatus);
@@ -1079,11 +1079,11 @@ public abstract class AbstractTestWALReplay {
     }
 
     @Override
-    public void completeCacheFlush(byte[] encodedRegionName) {
+    public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
       if (!doCompleteCacheFlush) {
         return;
       }
-      super.completeCacheFlush(encodedRegionName);
+      super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
index f7ada79..ad52bd3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
@@ -55,12 +55,12 @@ public class TestSequenceIdAccounting {
     Map<byte[], Long> m = new HashMap<>();
     m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
     assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
-    sida.completeCacheFlush(ENCODED_REGION_NAME);
+    sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
     long sequenceid = 1;
     sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
     // Only one family so should return NO_SEQNUM still.
     assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
-    sida.completeCacheFlush(ENCODED_REGION_NAME);
+    sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
     long currentSequenceId = sequenceid;
     sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
     final Set<byte[]> otherFamily = new HashSet<>(1);
@@ -68,7 +68,7 @@ public class TestSequenceIdAccounting {
     sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
     // Should return oldest sequence id in the region.
     assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily));
-    sida.completeCacheFlush(ENCODED_REGION_NAME);
+    sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
   }
 
   @Test
@@ -95,7 +95,7 @@ public class TestSequenceIdAccounting {
     m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
     assertTrue(sida.areAllLower(m));
     // Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits
-    sida.completeCacheFlush(ENCODED_REGION_NAME);
+    sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
     m.put(ENCODED_REGION_NAME, sequenceid);
     assertTrue(sida.areAllLower(m));
     // Flush again but add sequenceids while we are flushing.
@@ -108,7 +108,7 @@ public class TestSequenceIdAccounting {
     sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
     // The cache flush will clear out all sequenceid accounting by region.
     assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
-    sida.completeCacheFlush(ENCODED_REGION_NAME);
+    sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
     // No new edits have gone in so no sequenceid to work with.
     assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
     // Make an edit behind all we'll put now into sida.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java
index 82eefb2..72e4998 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -178,7 +179,7 @@ public class TestFSHLogProvider {
    */
   protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
     wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
-    wal.completeCacheFlush(regionEncodedName);
+    wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
   }
 
   @Test
@@ -230,7 +231,7 @@ public class TestFSHLogProvider {
       // archived. We need to append something or writer won't be rolled.
       addEdits(log, hri2, htd2, 1, scopes2);
       log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
-      log.completeCacheFlush(hri.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
       log.rollWriter();
       int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
       assertEquals(2, count);
@@ -240,7 +241,7 @@ public class TestFSHLogProvider {
       // flush information
       addEdits(log, hri2, htd2, 1, scopes2);
       log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames());
-      log.completeCacheFlush(hri2.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
       log.rollWriter();
       assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log));
     } finally {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 81db235..74ab840 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -528,7 +528,7 @@ public class TestWALFactory {
         htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
       log.sync(txid);
       log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
-      log.completeCacheFlush(info.getEncodedNameAsBytes());
+      log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
       log.shutdown();
       Path filename = AbstractFSWALProvider.getCurrentFileName(log);
       // Now open a reader on the log and assert append worked.
@@ -584,7 +584,7 @@ public class TestWALFactory {
         htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
       log.sync(txid);
       log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
-      log.completeCacheFlush(hri.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
       log.shutdown();
       Path filename = AbstractFSWALProvider.getCurrentFileName(log);
       // Now open a reader on the log and assert append worked.