You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/24 10:02:49 UTC

[hbase] branch branch-2.5 updated: HBASE-27732 NPE in TestBasicWALEntryStreamFSHLog.testEOFExceptionInOldWALsDirectory (#5119)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 23ce03c7c58 HBASE-27732 NPE in TestBasicWALEntryStreamFSHLog.testEOFExceptionInOldWALsDirectory (#5119)
23ce03c7c58 is described below

commit 23ce03c7c581e7fb519903526623fb8cbefc2ff5
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Mar 24 14:17:39 2023 +0800

    HBASE-27732 NPE in TestBasicWALEntryStreamFSHLog.testEOFExceptionInOldWALsDirectory (#5119)
    
    Add a 'closed' flag in WALProps in AbstractFSWAL to indicate that whether a WAL
    file has been closed, if not, we will not try to archive it. Will mark it as
    closed after we fully close it in the background close task, and try to archive
    again.
    
    Also modified some tests since now the archiving of a rolled WAL file is also
    asynchronous, we need to wait instead of asserting directly.
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    (cherry picked from commit 230fdc0b50578144d3e0916c33d4532bb8644763)
---
 .../hbase/regionserver/wal/AbstractFSWAL.java      |  63 +++++-
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  |  50 +++--
 .../hadoop/hbase/regionserver/wal/FSHLog.java      |  32 ++-
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    |  12 +-
 .../org/apache/hadoop/hbase/client/TestAdmin2.java |   8 +-
 .../hbase/client/TestAsyncClusterAdminApi.java     |   8 +-
 .../regionserver/TestPerColumnFamilyFlush.java     |   7 +-
 .../hbase/regionserver/wal/AbstractTestFSWAL.java  | 199 +++++++++---------
 .../regionserver/wal/AbstractTestLogRolling.java   |  36 ++--
 .../hadoop/hbase/wal/TestFSHLogProvider.java       | 233 +++++++++++----------
 10 files changed, 361 insertions(+), 287 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 450b8f33e8f..17bfb61291a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -288,22 +288,29 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
   final Comparator<Path> LOG_NAME_COMPARATOR =
     (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
 
-  private static final class WalProps {
+  private static final class WALProps {
 
     /**
      * Map the encoded region name to the highest sequence id.
      * <p/>
      * Contains all the regions it has an entry for.
      */
-    public final Map<byte[], Long> encodedName2HighestSequenceId;
+    private final Map<byte[], Long> encodedName2HighestSequenceId;
 
     /**
      * The log file size. Notice that the size may not be accurate if we do asynchronous close in
      * sub classes.
      */
-    public final long logSize;
+    private final long logSize;
 
-    public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
+    /**
+     * If we do asynchronous close in sub classes, it is possible that when adding WALProps to the
+     * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
+     * for safety.
+     */
+    private volatile boolean closed = false;
+
+    WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
       this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
       this.logSize = logSize;
     }
@@ -313,7 +320,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
    * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
    * (contained in the log file name).
    */
-  protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
+  protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
     new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
 
   /**
@@ -332,6 +339,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
 
   protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
 
+  protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
+
   // Run in caller if we get reject execution exception, to avoid aborting region server when we get
   // reject execution exception. Usually this should not happen but let's make it more robust.
   private final ExecutorService logArchiveExecutor =
@@ -679,7 +689,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     Map<byte[], List<byte[]>> regions = null;
     int logCount = getNumRolledLogFiles();
     if (logCount > this.maxLogs && logCount > 0) {
-      Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
+      Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
       regions =
         this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
     }
@@ -702,14 +712,35 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     return regions;
   }
 
+  /**
+   * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
+   */
+  protected final void markClosedAndClean(Path path) {
+    WALProps props = walFile2Props.get(path);
+    // typically this should not be null, but if there is no big issue if it is already null, so
+    // let's make the code more robust
+    if (props != null) {
+      props.closed = true;
+      cleanOldLogs();
+    }
+  }
+
   /**
    * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
+   * <p/>
+   * Use synchronized because we may call this method in different threads, normally when replacing
+   * writer, and since now close writer may be asynchronous, we will also call this method in the
+   * closeExecutor, right after we actually close a WAL writer.
    */
-  private void cleanOldLogs() throws IOException {
+  private synchronized void cleanOldLogs() {
     List<Pair<Path, Long>> logsToArchive = null;
     // For each log file, look at its Map of regions to highest sequence id; if all sequence ids
     // are older than what is currently in memory, the WAL can be GC'd.
-    for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
+    for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
+      if (!e.getValue().closed) {
+        LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
+        continue;
+      }
       Path log = e.getKey();
       Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
       if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
@@ -791,7 +822,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
     if (oldPath != null) {
       this.walFile2Props.put(oldPath,
-        new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
+        new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
       this.totalLogSize.addAndGet(oldFileLen);
       LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
         CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
@@ -987,6 +1018,20 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       // and abort the region server
       logArchiveExecutor.shutdown();
     }
+    // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still
+    // have some pending archiving tasks not finished yet, and in close we may archive all the
+    // remaining WAL files, there could be race if we do not wait for the background archive task
+    // finish
+    try {
+      if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) {
+        throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
+          + " the shutdown of WAL doesn't complete! Please check the status of underlying "
+          + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
+          + "\"");
+      }
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
+    }
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 9fa36630d49..c1e6c1b6907 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -35,7 +35,6 @@ import java.util.Queue;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -179,9 +178,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   private final long batchSize;
 
-  private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
-    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
-
   private volatile AsyncFSOutput fsOut;
 
   private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
@@ -718,23 +714,20 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     }
   }
 
-  protected final long closeWriter(AsyncWriter writer, Path path) {
-    if (writer != null) {
-      inflightWALClosures.put(path.getName(), writer);
-      long fileLength = writer.getLength();
-      closeExecutor.execute(() -> {
-        try {
-          writer.close();
-        } catch (IOException e) {
-          LOG.warn("close old writer failed", e);
-        } finally {
-          inflightWALClosures.remove(path.getName());
-        }
-      });
-      return fileLength;
-    } else {
-      return 0L;
-    }
+  private void closeWriter(AsyncWriter writer, Path path) {
+    inflightWALClosures.put(path.getName(), writer);
+    closeExecutor.execute(() -> {
+      try {
+        writer.close();
+      } catch (IOException e) {
+        LOG.warn("close old writer failed", e);
+      } finally {
+        // call this even if the above close fails, as there is no other chance we can set closed to
+        // true, it will not cause big problems.
+        markClosedAndClean(path);
+        inflightWALClosures.remove(path.getName());
+      }
+    });
   }
 
   @Override
@@ -742,8 +735,19 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     throws IOException {
     Preconditions.checkNotNull(nextWriter);
     waitForSafePoint();
-    long oldFileLen = closeWriter(this.writer, oldPath);
-    logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
+    // we will call rollWriter in init method, where we want to create the first writer and
+    // obviously the previous writer is null, so here we need this null check. And why we must call
+    // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after
+    // closing the writer asynchronously, we need to make sure the WALProps is put into
+    // walFile2Props before we call markClosedAndClean
+    if (writer != null) {
+      long oldFileLen = writer.getLength();
+      logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
+      closeWriter(writer, oldPath);
+    } else {
+      logRollAndSetupWalProps(oldPath, newPath, 0);
+    }
+
     this.writer = nextWriter;
     if (nextWriter instanceof AsyncProtobufLogWriter) {
       this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 1df2d91e46b..6afe2e06794 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -36,8 +36,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -169,8 +167,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
   private final AtomicInteger closeErrorCount = new AtomicInteger();
 
   private final int waitOnShutdownInSeconds;
-  private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
-    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
 
   /**
    * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
@@ -376,28 +372,44 @@ public class FSHLog extends AbstractFSWAL<Writer> {
         LOG.warn(
           "Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
       }
-      long oldFileLen = 0L;
       // It is at the safe point. Swap out writer from under the blocked writer thread.
+      // we will call rollWriter in init method, where we want to create the first writer and
+      // obviously the previous writer is null, so here we need this null check. And why we must
+      // call logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean
+      // after closing the writer asynchronously, we need to make sure the WALProps is put into
+      // walFile2Props before we call markClosedAndClean
       if (this.writer != null) {
-        oldFileLen = this.writer.getLength();
+        long oldFileLen = this.writer.getLength();
+        logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
         // In case of having unflushed entries or we already reached the
         // closeErrorsTolerated count, call the closeWriter inline rather than in async
         // way so that in case of an IOE we will throw it back and abort RS.
         inflightWALClosures.put(oldPath.getName(), writer);
         if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
-          closeWriter(this.writer, oldPath, true);
+          try {
+            closeWriter(this.writer, oldPath, true);
+          } finally {
+            inflightWALClosures.remove(oldPath.getName());
+          }
         } else {
           Writer localWriter = this.writer;
           closeExecutor.execute(() -> {
             try {
               closeWriter(localWriter, oldPath, false);
             } catch (IOException e) {
-              // We will never reach here.
+              LOG.warn("close old writer failed", e);
+            } finally {
+              // call this even if the above close fails, as there is no other chance we can set
+              // closed to true, it will not cause big problems.
+              markClosedAndClean(oldPath);
+              inflightWALClosures.remove(oldPath.getName());
             }
           });
         }
+      } else {
+        logRollAndSetupWalProps(oldPath, newPath, 0);
       }
-      logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
+
       this.writer = nextWriter;
       if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
         this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
@@ -452,8 +464,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       }
       LOG.warn("Riding over failed WAL close of " + path
         + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
-    } finally {
-      inflightWALClosures.remove(path.getName());
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 7e5e33098c2..cfb2a4fddfd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -397,10 +397,11 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
       serverName = ServerName.parseServerName(logDirName);
     } catch (IllegalArgumentException | IllegalStateException ex) {
       serverName = null;
-      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
+      LOG.warn("Cannot parse a server name from path={}", logFile, ex);
     }
-    if (serverName != null && serverName.getStartcode() < 0) {
-      LOG.warn("Invalid log file path=" + logFile);
+    if (serverName != null && serverName.getStartCode() < 0) {
+      LOG.warn("Invalid log file path={}, start code {} is less than 0", logFile,
+        serverName.getStartCode());
       serverName = null;
     }
     return serverName;
@@ -465,6 +466,11 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
     }
 
     ServerName serverName = getServerNameFromWALDirectoryName(path);
+    if (serverName == null) {
+      LOG.warn("Can not extract server name from path {}, "
+        + "give up searching the separated old log dir", path);
+      return null;
+    }
     // Try finding the log in separate old log dir
     oldLogDir = new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
       .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index 03bdb7e8f86..58caf6f6613 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -419,9 +419,11 @@ public class TestAdmin2 extends TestAdminBase {
       r.flush(true);
     }
     ADMIN.rollWALWriter(regionServer.getServerName());
-    int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
-    LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
-    assertTrue(("actual count: " + count), count <= 2);
+    TEST_UTIL.waitFor(5000, () -> {
+      int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
+      LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
+      return count <= 2;
+    });
   }
 
   private void setUpforLogRolling() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java
index 473e316b49f..30cb95d80d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java
@@ -152,9 +152,11 @@ public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
       r.flush(true);
     }
     admin.rollWALWriter(regionServer.getServerName()).join();
-    int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
-    LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
-    assertTrue(("actual count: " + count), count <= 2);
+    TEST_UTIL.waitFor(5000, () -> {
+      int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
+      LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
+      return count <= 2;
+    });
   }
 
   private void setUpforLogRolling() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index e9eec9e5352..a36b2559208 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -488,9 +488,8 @@ public class TestPerColumnFamilyFlush {
         // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
         int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
         assertNull(getWAL(desiredRegion).rollWriter());
-        while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
-          Thread.sleep(100);
-        }
+        TEST_UTIL.waitFor(60000,
+          () -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles);
       }
       assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
       assertTrue(
@@ -529,7 +528,7 @@ public class TestPerColumnFamilyFlush {
         desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
       // let WAL cleanOldLogs
       assertNull(getWAL(desiredRegion).rollWriter(true));
-      assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
+      TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs);
     } finally {
       TEST_UTIL.shutdownMiniCluster();
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
index 3200ad2b58d..1f04e2718be 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
@@ -253,21 +253,14 @@ public abstract class AbstractTestFSWAL {
     }
   }
 
-  /**
-   * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of
-   * regions which should be flushed in order to archive the oldest wal file.
-   * <p>
-   * This method tests this behavior by inserting edits and rolling the wal enough times to reach
-   * the max number of logs threshold. It checks whether we get the "right regions and stores" for
-   * flush on rolling the wal.
-   */
-  @Test
-  public void testFindMemStoresEligibleForFlush() throws Exception {
-    LOG.debug("testFindMemStoresEligibleForFlush");
-    Configuration conf1 = HBaseConfiguration.create(CONF);
-    conf1.setInt("hbase.regionserver.maxlogs", 1);
-    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
-      HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
+  // now we will close asynchronously and will not archive a wal file unless it is fully closed, so
+  // sometimes we need to wait a bit before asserting, especially when you want to test the removal
+  // of numRolledLogFiles
+  private void waitNumRolledLogFiles(AbstractFSWAL<?> wal, int expected) {
+    TEST_UTIL.waitFor(5000, () -> wal.getNumRolledLogFiles() == expected);
+  }
+
+  private void testFindMemStoresEligibleForFlush(AbstractFSWAL<?> wal) throws IOException {
     String cf1 = "cf1";
     String cf2 = "cf2";
     String cf3 = "cf3";
@@ -278,7 +271,7 @@ public abstract class AbstractTestFSWAL {
     RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
     RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
 
-    List<ColumnFamilyDescriptor> cfs = new ArrayList();
+    List<ColumnFamilyDescriptor> cfs = new ArrayList<>();
     cfs.add(ColumnFamilyDescriptorBuilder.of(cf1));
     cfs.add(ColumnFamilyDescriptorBuilder.of(cf2));
     TableDescriptor t3 =
@@ -299,87 +292,101 @@ public abstract class AbstractTestFSWAL {
     for (byte[] fam : t3.getColumnFamilyNames()) {
       scopes3.put(fam, 0);
     }
-    try {
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
-      wal.rollWriter();
-      // add some more edits and roll the wal. This would reach the log number threshold
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
-      wal.rollWriter();
-      // with above rollWriter call, the max logs limit is reached.
-      assertTrue(wal.getNumRolledLogFiles() == 2);
-
-      // get the regions to flush; since there is only one region in the oldest wal, it should
-      // return only one region.
-      Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush();
-      assertEquals(1, regionsToFlush.size());
-      assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
-      // insert edits in second region
-      addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
-      // get the regions to flush, it should still read region1.
-      regionsToFlush = wal.findRegionsToForceFlush();
-      assertEquals(1, regionsToFlush.size());
-      assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
-      // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
-      // remain.
-      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
-      wal.rollWriter();
-      // only one wal should remain now (that is for the second region).
-      assertEquals(1, wal.getNumRolledLogFiles());
-      // flush the second region
-      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
-      wal.rollWriter(true);
-      // no wal should remain now.
-      assertEquals(0, wal.getNumRolledLogFiles());
-      // add edits both to region 1 and region 2, and roll.
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
-      addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
-      wal.rollWriter();
-      // add edits and roll the writer, to reach the max logs limit.
-      assertEquals(1, wal.getNumRolledLogFiles());
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
-      wal.rollWriter();
-      // it should return two regions to flush, as the oldest wal file has entries
-      // for both regions.
-      regionsToFlush = wal.findRegionsToForceFlush();
-      assertEquals(2, regionsToFlush.size());
-      // flush both regions
-      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
-      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
-      wal.rollWriter(true);
-      assertEquals(0, wal.getNumRolledLogFiles());
-      // Add an edit to region1, and roll the wal.
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
-      // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
-      wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
-      wal.rollWriter();
-      wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
-      assertEquals(1, wal.getNumRolledLogFiles());
-
-      // clear test data
-      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
-      wal.rollWriter(true);
-      // add edits for three familes
-      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
-      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2);
-      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3);
-      wal.rollWriter();
-      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
-      wal.rollWriter();
-      assertEquals(2, wal.getNumRolledLogFiles());
-      // flush one family before archive oldest wal
-      Set<byte[]> flushedFamilyNames = new HashSet<>();
-      flushedFamilyNames.add(Bytes.toBytes(cf1));
-      flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames);
-      regionsToFlush = wal.findRegionsToForceFlush();
-      // then only two family need to be flushed when archive oldest wal
-      assertEquals(1, regionsToFlush.size());
-      assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
-      assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size());
-    } finally {
-      if (wal != null) {
-        wal.close();
-      }
+    addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
+    wal.rollWriter();
+    // add some more edits and roll the wal. This would reach the log number threshold
+    addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
+    wal.rollWriter();
+    // with above rollWriter call, the max logs limit is reached.
+    waitNumRolledLogFiles(wal, 2);
+
+    // get the regions to flush; since there is only one region in the oldest wal, it should
+    // return only one region.
+    Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush();
+    assertEquals(1, regionsToFlush.size());
+    assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
+    // insert edits in second region
+    addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
+    // get the regions to flush, it should still read region1.
+    regionsToFlush = wal.findRegionsToForceFlush();
+    assertEquals(1, regionsToFlush.size());
+    assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
+    // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
+    // remain.
+    flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
+    wal.rollWriter();
+    // only one wal should remain now (that is for the second region).
+    waitNumRolledLogFiles(wal, 1);
+    // flush the second region
+    flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
+    wal.rollWriter(true);
+    // no wal should remain now.
+    waitNumRolledLogFiles(wal, 0);
+    // add edits both to region 1 and region 2, and roll.
+    addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
+    addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
+    wal.rollWriter();
+    // add edits and roll the writer, to reach the max logs limit.
+    waitNumRolledLogFiles(wal, 1);
+    addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
+    wal.rollWriter();
+    // it should return two regions to flush, as the oldest wal file has entries
+    // for both regions.
+    regionsToFlush = wal.findRegionsToForceFlush();
+    assertEquals(2, regionsToFlush.size());
+    // flush both regions
+    flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
+    flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
+    wal.rollWriter(true);
+    waitNumRolledLogFiles(wal, 0);
+    // Add an edit to region1, and roll the wal.
+    addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
+    // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
+    wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
+    wal.rollWriter();
+    wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
+    waitNumRolledLogFiles(wal, 1);
+
+    // clear test data
+    flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
+    wal.rollWriter(true);
+    // add edits for three familes
+    addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
+    addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2);
+    addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3);
+    wal.rollWriter();
+    addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
+    wal.rollWriter();
+    waitNumRolledLogFiles(wal, 2);
+    // flush one family before archive oldest wal
+    Set<byte[]> flushedFamilyNames = new HashSet<>();
+    flushedFamilyNames.add(Bytes.toBytes(cf1));
+    flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames);
+    regionsToFlush = wal.findRegionsToForceFlush();
+    // then only two family need to be flushed when archive oldest wal
+    assertEquals(1, regionsToFlush.size());
+    assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
+    assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size());
+  }
+
+  /**
+   * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of
+   * regions which should be flushed in order to archive the oldest wal file.
+   * <p>
+   * This method tests this behavior by inserting edits and rolling the wal enough times to reach
+   * the max number of logs threshold. It checks whether we get the "right regions and stores" for
+   * flush on rolling the wal.
+   */
+  @Test
+  public void testFindMemStoresEligibleForFlush() throws Exception {
+    LOG.debug("testFindMemStoresEligibleForFlush");
+    Configuration conf1 = HBaseConfiguration.create(CONF);
+    conf1.setInt("hbase.regionserver.maxlogs", 1);
+    try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
+      HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null)) {
+      testFindMemStoresEligibleForFlush(wal);
     }
+
   }
 
   @Test(expected = IOException.class)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
index 90480af0054..722f5ce5bc2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -137,9 +139,7 @@ public abstract class AbstractTestLogRolling {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  protected void startAndWriteData() throws IOException, InterruptedException {
-    // When the hbase:meta table can be opened, the region servers are running
-    TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
+  private void startAndWriteData() throws IOException, InterruptedException {
     this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
 
     Table table = createTestTable(this.tableName);
@@ -175,19 +175,6 @@ public abstract class AbstractTestLogRolling {
     }
   }
 
-  private void assertLogFileSize(WAL log) throws InterruptedException {
-    if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) {
-      assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0);
-    } else {
-      for (int i = 0; i < 10; i++) {
-        if (AbstractFSWALProvider.getLogFileSize(log) != 0) {
-          Thread.sleep(10);
-        }
-      }
-      assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
-    }
-  }
-
   /**
    * Tests that logs are deleted
    */
@@ -200,20 +187,25 @@ public abstract class AbstractTestLogRolling {
     final WAL log = server.getWAL(region);
     LOG.info(
       "after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) + " log files");
-    assertLogFileSize(log);
+
+    // roll the log, so we should have at least one rolled file and the log file size should be
+    // greater than 0, in case in the above method we rolled in the last round and then flushed so
+    // all the old wal files are deleted and cause the below assertion to fail
+    log.rollWriter();
+
+    assertThat(AbstractFSWALProvider.getLogFileSize(log), greaterThan(0L));
 
     // flush all regions
     for (HRegion r : server.getOnlineRegionsLocalContext()) {
       r.flush(true);
     }
 
-    // Now roll the log
+    // Now roll the log the again
     log.rollWriter();
 
-    int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
-    LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
-    assertTrue(("actual count: " + count), count <= 2);
-    assertLogFileSize(log);
+    // should have deleted all the rolled wal files
+    TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(log) == 0);
+    assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
   }
 
   protected String getName() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java
index cdc9757ed45..ce490cfaef5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java
@@ -112,10 +112,6 @@ public class TestFSHLogProvider {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  static String getName() {
-    return "TestDefaultWALProvider";
-  }
-
   @Test
   public void testGetServerNameFromWALDirectoryName() throws IOException {
     ServerName sn = ServerName.valueOf("hn", 450, 1398);
@@ -163,7 +159,7 @@ public class TestFSHLogProvider {
   /**
    * used by TestDefaultWALProviderWithHLogKey
    */
-  WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp,
+  private WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp,
     NavigableMap<byte[], Integer> scopes) {
     return new WALKeyImpl(info, tableName, timestamp, mvcc, scopes);
   }
@@ -171,14 +167,19 @@ public class TestFSHLogProvider {
   /**
    * helper method to simulate region flush for a WAL.
    */
-  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
+  private void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
     wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
     wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
   }
 
-  @Test
-  public void testLogCleaning() throws Exception {
-    LOG.info(currentTest.getMethodName());
+  // now we will close asynchronously and will not archive a wal file unless it is fully closed, so
+  // sometimes we need to wait a bit before asserting, especially when you want to test the removal
+  // of numRolledLogFiles
+  private void waitNumRolledLogFiles(WAL wal, int expected) {
+    TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(wal) == expected);
+  }
+
+  private void testLogCleaning(WALFactory wals) throws IOException {
     TableDescriptor htd =
       TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
         .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
@@ -193,71 +194,62 @@ public class TestFSHLogProvider {
     for (byte[] fam : htd2.getColumnFamilyNames()) {
       scopes2.put(fam, 0);
     }
+    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
+    RegionInfo hri2 = RegionInfoBuilder.newBuilder(htd2.getTableName()).build();
+    // we want to mix edits from regions, so pick our own identifier.
+    WAL log = wals.getWAL(null);
+
+    // Add a single edit and make sure that rolling won't remove the file
+    // Before HBASE-3198 it used to delete it
+    addEdits(log, hri, htd, 1, scopes1);
+    log.rollWriter();
+    waitNumRolledLogFiles(log, 1);
+
+    // See if there's anything wrong with more than 1 edit
+    addEdits(log, hri, htd, 2, scopes1);
+    log.rollWriter();
+    assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log));
+
+    // Now mix edits from 2 regions, still no flushing
+    addEdits(log, hri, htd, 1, scopes1);
+    addEdits(log, hri2, htd2, 1, scopes2);
+    addEdits(log, hri, htd, 1, scopes1);
+    addEdits(log, hri2, htd2, 1, scopes2);
+    log.rollWriter();
+    waitNumRolledLogFiles(log, 3);
+
+    // Flush the first region, we expect to see the first two files getting
+    // archived. We need to append something or writer won't be rolled.
+    addEdits(log, hri2, htd2, 1, scopes2);
+    log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
+    log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
+    log.rollWriter();
+    waitNumRolledLogFiles(log, 2);
+
+    // Flush the second region, which removes all the remaining output files
+    // since the oldest was completely flushed and the two others only contain
+    // flush information
+    addEdits(log, hri2, htd2, 1, scopes2);
+    log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames());
+    log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
+    log.rollWriter();
+    waitNumRolledLogFiles(log, 0);
+  }
+
+  @Test
+  public void testLogCleaning() throws Exception {
+    LOG.info(currentTest.getMethodName());
     Configuration localConf = new Configuration(conf);
     localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName());
     WALFactory wals = new WALFactory(localConf, currentTest.getMethodName());
     try {
-      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
-      RegionInfo hri2 = RegionInfoBuilder.newBuilder(htd2.getTableName()).build();
-      // we want to mix edits from regions, so pick our own identifier.
-      WAL log = wals.getWAL(null);
-
-      // Add a single edit and make sure that rolling won't remove the file
-      // Before HBASE-3198 it used to delete it
-      addEdits(log, hri, htd, 1, scopes1);
-      log.rollWriter();
-      assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log));
-
-      // See if there's anything wrong with more than 1 edit
-      addEdits(log, hri, htd, 2, scopes1);
-      log.rollWriter();
-      assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log));
-
-      // Now mix edits from 2 regions, still no flushing
-      addEdits(log, hri, htd, 1, scopes1);
-      addEdits(log, hri2, htd2, 1, scopes2);
-      addEdits(log, hri, htd, 1, scopes1);
-      addEdits(log, hri2, htd2, 1, scopes2);
-      log.rollWriter();
-      assertEquals(3, AbstractFSWALProvider.getNumRolledLogFiles(log));
-
-      // Flush the first region, we expect to see the first two files getting
-      // archived. We need to append something or writer won't be rolled.
-      addEdits(log, hri2, htd2, 1, scopes2);
-      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
-      log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
-      log.rollWriter();
-      int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
-      assertEquals(2, count);
-
-      // Flush the second region, which removes all the remaining output files
-      // since the oldest was completely flushed and the two others only contain
-      // flush information
-      addEdits(log, hri2, htd2, 1, scopes2);
-      log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames());
-      log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
-      log.rollWriter();
-      assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log));
+      testLogCleaning(wals);
     } finally {
-      if (wals != null) {
-        wals.close();
-      }
+      wals.close();
     }
   }
 
-  /**
-   * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs and
-   * also don't archive "live logs" (that is, a log with un-flushed entries).
-   * <p>
-   * This is what it does: It creates two regions, and does a series of inserts along with log
-   * rolling. Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is
-   * eligible for archiving if for all the regions which have entries in that wal file, have flushed
-   * - past their maximum sequence id in that wal file.
-   * <p>
-   */
-  @Test
-  public void testWALArchiving() throws IOException {
-    LOG.debug(currentTest.getMethodName());
+  private void testWALArchiving(WALFactory wals) throws IOException {
     TableDescriptor table1 =
       TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "1"))
         .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
@@ -272,58 +264,73 @@ public class TestFSHLogProvider {
     for (byte[] fam : table2.getColumnFamilyNames()) {
       scopes2.put(fam, 0);
     }
+    WAL wal = wals.getWAL(null);
+    assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
+    RegionInfo hri1 = RegionInfoBuilder.newBuilder(table1.getTableName()).build();
+    RegionInfo hri2 = RegionInfoBuilder.newBuilder(table2.getTableName()).build();
+    // variables to mock region sequenceIds.
+    // start with the testing logic: insert a waledit, and roll writer
+    addEdits(wal, hri1, table1, 1, scopes1);
+    wal.rollWriter();
+    // assert that the wal is rolled
+    waitNumRolledLogFiles(wal, 1);
+    // add edits in the second wal file, and roll writer.
+    addEdits(wal, hri1, table1, 1, scopes1);
+    wal.rollWriter();
+    // assert that the wal is rolled
+    waitNumRolledLogFiles(wal, 2);
+    // add a waledit to table1, and flush the region.
+    addEdits(wal, hri1, table1, 3, scopes1);
+    flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getColumnFamilyNames());
+    // roll log; all old logs should be archived.
+    wal.rollWriter();
+    waitNumRolledLogFiles(wal, 0);
+    // add an edit to table2, and roll writer
+    addEdits(wal, hri2, table2, 1, scopes2);
+    wal.rollWriter();
+    waitNumRolledLogFiles(wal, 1);
+    // add edits for table1, and roll writer
+    addEdits(wal, hri1, table1, 2, scopes1);
+    wal.rollWriter();
+    waitNumRolledLogFiles(wal, 2);
+    // add edits for table2, and flush hri1.
+    addEdits(wal, hri2, table2, 2, scopes2);
+    flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getColumnFamilyNames());
+    // the log : region-sequenceId map is
+    // log1: region2 (unflushed)
+    // log2: region1 (flushed)
+    // log3: region2 (unflushed)
+    // roll the writer; log2 should be archived.
+    wal.rollWriter();
+    waitNumRolledLogFiles(wal, 2);
+    // flush region2, and all logs should be archived.
+    addEdits(wal, hri2, table2, 2, scopes2);
+    flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getColumnFamilyNames());
+    wal.rollWriter();
+    waitNumRolledLogFiles(wal, 0);
+  }
+
+  /**
+   * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs and
+   * also don't archive "live logs" (that is, a log with un-flushed entries).
+   * <p>
+   * This is what it does: It creates two regions, and does a series of inserts along with log
+   * rolling. Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is
+   * eligible for archiving if for all the regions which have entries in that wal file, have flushed
+   * - past their maximum sequence id in that wal file.
+   * <p>
+   */
+  @Test
+  public void testWALArchiving() throws IOException {
+    LOG.debug(currentTest.getMethodName());
+
     Configuration localConf = new Configuration(conf);
     localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName());
     WALFactory wals = new WALFactory(localConf, currentTest.getMethodName());
     try {
-      WAL wal = wals.getWAL(null);
-      assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
-      RegionInfo hri1 = RegionInfoBuilder.newBuilder(table1.getTableName()).build();
-      RegionInfo hri2 = RegionInfoBuilder.newBuilder(table2.getTableName()).build();
-      // variables to mock region sequenceIds.
-      // start with the testing logic: insert a waledit, and roll writer
-      addEdits(wal, hri1, table1, 1, scopes1);
-      wal.rollWriter();
-      // assert that the wal is rolled
-      assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal));
-      // add edits in the second wal file, and roll writer.
-      addEdits(wal, hri1, table1, 1, scopes1);
-      wal.rollWriter();
-      // assert that the wal is rolled
-      assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal));
-      // add a waledit to table1, and flush the region.
-      addEdits(wal, hri1, table1, 3, scopes1);
-      flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getColumnFamilyNames());
-      // roll log; all old logs should be archived.
-      wal.rollWriter();
-      assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
-      // add an edit to table2, and roll writer
-      addEdits(wal, hri2, table2, 1, scopes2);
-      wal.rollWriter();
-      assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal));
-      // add edits for table1, and roll writer
-      addEdits(wal, hri1, table1, 2, scopes1);
-      wal.rollWriter();
-      assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal));
-      // add edits for table2, and flush hri1.
-      addEdits(wal, hri2, table2, 2, scopes2);
-      flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getColumnFamilyNames());
-      // the log : region-sequenceId map is
-      // log1: region2 (unflushed)
-      // log2: region1 (flushed)
-      // log3: region2 (unflushed)
-      // roll the writer; log2 should be archived.
-      wal.rollWriter();
-      assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal));
-      // flush region2, and all logs should be archived.
-      addEdits(wal, hri2, table2, 2, scopes2);
-      flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getColumnFamilyNames());
-      wal.rollWriter();
-      assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
+      testWALArchiving(wals);
     } finally {
-      if (wals != null) {
-        wals.close();
-      }
+      wals.close();
     }
   }