You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/09/13 01:48:36 UTC

[hbase] 01/02: Revert "HBASE-25891 Remove dependence on storing WAL filenames for backup (#3359)"

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit fc5a8fd3bf6d520cef87ec63dd50cba94f8a207d
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Sep 13 09:47:25 2021 +0800

    Revert "HBASE-25891 Remove dependence on storing WAL filenames for backup (#3359)"
    
    This reverts commit df57b1ca6b990f69c5edd08862dbc9956965b45e.
---
 .../org/apache/hadoop/hbase/backup/BackupInfo.java |  51 ++---
 .../hadoop/hbase/backup/impl/BackupManager.java    |  23 ++-
 .../hadoop/hbase/backup/impl/BackupManifest.java   |  20 +-
 .../hbase/backup/impl/BackupSystemTable.java       | 207 ++++++++++++++++++++-
 .../hbase/backup/impl/FullTableBackupClient.java   |  12 +-
 .../backup/impl/IncrementalBackupManager.java      | 132 ++++++++++++-
 .../backup/impl/IncrementalTableBackupClient.java  |   8 +-
 .../hbase/backup/impl/TableBackupClient.java       |   5 +-
 .../hbase/backup/master/BackupLogCleaner.java      |  98 ++++------
 .../hadoop/hbase/backup/util/BackupUtils.java      |  19 +-
 .../apache/hadoop/hbase/backup/TestBackupBase.java |  47 +++--
 .../hadoop/hbase/backup/TestBackupSystemTable.java |  49 ++++-
 .../hbase/backup/master/TestBackupLogCleaner.java  |  49 ++++-
 .../src/main/protobuf/Backup.proto                 |   4 -
 14 files changed, 537 insertions(+), 187 deletions(-)

diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
index d8a6940..6c304ca 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos.BackupInfo.Builder;
@@ -134,13 +136,7 @@ public class BackupInfo implements Comparable<BackupInfo> {
    * New region server log timestamps for table set after distributed log roll key - table name,
    * value - map of RegionServer hostname -> last log rolled timestamp
    */
-  private Map<TableName, Map<String, Long>> tableSetTimestampMap;
-
-  /**
-   * Previous Region server log timestamps for table set after distributed log roll key -
-   * table name, value - map of RegionServer hostname -> last log rolled timestamp
-   */
-  private Map<TableName, Map<String, Long>> incrTimestampMap;
+  private HashMap<TableName, HashMap<String, Long>> tableSetTimestampMap;
 
   /**
    * Backup progress in %% (0-100)
@@ -194,12 +190,12 @@ public class BackupInfo implements Comparable<BackupInfo> {
     this.backupTableInfoMap = backupTableInfoMap;
   }
 
-  public Map<TableName, Map<String, Long>> getTableSetTimestampMap() {
+  public HashMap<TableName, HashMap<String, Long>> getTableSetTimestampMap() {
     return tableSetTimestampMap;
   }
 
-  public void setTableSetTimestampMap(Map<TableName,
-          Map<String, Long>> tableSetTimestampMap) {
+  public void setTableSetTimestampMap(HashMap<TableName,
+          HashMap<String, Long>> tableSetTimestampMap) {
     this.tableSetTimestampMap = tableSetTimestampMap;
   }
 
@@ -355,19 +351,19 @@ public class BackupInfo implements Comparable<BackupInfo> {
 
   /**
    * Set the new region server log timestamps after distributed log roll
-   * @param prevTableSetTimestampMap table timestamp map
+   * @param newTableSetTimestampMap table timestamp map
    */
-  public void setIncrTimestampMap(Map<TableName,
-          Map<String, Long>> prevTableSetTimestampMap) {
-    this.incrTimestampMap = prevTableSetTimestampMap;
+  public void setIncrTimestampMap(HashMap<TableName,
+          HashMap<String, Long>> newTableSetTimestampMap) {
+    this.tableSetTimestampMap = newTableSetTimestampMap;
   }
 
   /**
    * Get new region server log timestamps after distributed log roll
    * @return new region server log timestamps
    */
-  public Map<TableName, Map<String, Long>> getIncrTimestampMap() {
-    return this.incrTimestampMap;
+  public HashMap<TableName, HashMap<String, Long>> getIncrTimestampMap() {
+    return this.tableSetTimestampMap;
   }
 
   public TableName getTableBySnapshot(String snapshotName) {
@@ -383,7 +379,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
     BackupProtos.BackupInfo.Builder builder = BackupProtos.BackupInfo.newBuilder();
     builder.setBackupId(getBackupId());
     setBackupTableInfoMap(builder);
-    setTableSetTimestampMap(builder);
     builder.setCompleteTs(getCompleteTs());
     if (getFailedMsg() != null) {
       builder.setFailedMessage(getFailedMsg());
@@ -451,16 +446,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
     }
   }
 
-  private void setTableSetTimestampMap(Builder builder) {
-    if (this.getTableSetTimestampMap() != null) {
-      for (Entry<TableName, Map<String, Long>> entry : this.getTableSetTimestampMap().entrySet()) {
-        builder.putTableSetTimestamp(entry.getKey().getNameAsString(),
-          BackupProtos.BackupInfo.RSTimestampMap.newBuilder().putAllRsTimestamp(entry.getValue())
-            .build());
-      }
-    }
-  }
-
   public static BackupInfo fromByteArray(byte[] data) throws IOException {
     return fromProto(BackupProtos.BackupInfo.parseFrom(data));
   }
@@ -473,7 +458,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
     BackupInfo context = new BackupInfo();
     context.setBackupId(proto.getBackupId());
     context.setBackupTableInfoMap(toMap(proto.getBackupTableInfoList()));
-    context.setTableSetTimestampMap(getTableSetTimestampMap(proto.getTableSetTimestampMap()));
     context.setCompleteTs(proto.getCompleteTs());
     if (proto.hasFailedMessage()) {
       context.setFailedMsg(proto.getFailedMessage());
@@ -507,17 +491,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
     return map;
   }
 
-  private static Map<TableName, Map<String, Long>> getTableSetTimestampMap(
-    Map<String, BackupProtos.BackupInfo.RSTimestampMap> map) {
-    Map<TableName, Map<String, Long>> tableSetTimestampMap = new HashMap<>();
-    for (Entry<String, BackupProtos.BackupInfo.RSTimestampMap> entry : map.entrySet()) {
-      tableSetTimestampMap
-        .put(TableName.valueOf(entry.getKey()), entry.getValue().getRsTimestampMap());
-    }
-
-    return tableSetTimestampMap;
-  }
-
   public String getShortDescription() {
     StringBuilder sb = new StringBuilder();
     sb.append("{");
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 08494f0..79c702c 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -481,7 +482,7 @@ public class BackupManager implements Closeable {
    * @throws IOException exception
    */
   public void writeRegionServerLogTimestamp(Set<TableName> tables,
-      Map<String, Long> newTimestamps) throws IOException {
+      HashMap<String, Long> newTimestamps) throws IOException {
     systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir());
   }
 
@@ -492,7 +493,7 @@ public class BackupManager implements Closeable {
    *         RegionServer,PreviousTimeStamp
    * @throws IOException exception
    */
-  public Map<TableName, Map<String, Long>> readLogTimestampMap() throws IOException {
+  public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap() throws IOException {
     return systemTable.readLogTimestampMap(backupInfo.getBackupRootDir());
   }
 
@@ -514,6 +515,24 @@ public class BackupManager implements Closeable {
     systemTable.addIncrementalBackupTableSet(tables, backupInfo.getBackupRootDir());
   }
 
+  /**
+   * Saves list of WAL files after incremental backup operation. These files will be stored until
+   * TTL expiration and are used by Backup Log Cleaner plug-in to determine which WAL files can be
+   * safely purged.
+   */
+  public void recordWALFiles(List<String> files) throws IOException {
+    systemTable.addWALFiles(files, backupInfo.getBackupId(), backupInfo.getBackupRootDir());
+  }
+
+  /**
+   * Get WAL files iterator.
+   * @return WAL files iterator from backup system table
+   * @throws IOException if getting the WAL files iterator fails
+   */
+  public Iterator<BackupSystemTable.WALItem> getWALFilesFromBackupSystem() throws IOException {
+    return systemTable.getWALFilesIterator(backupInfo.getBackupRootDir());
+  }
+
   public Connection getConnection() {
     return conn;
   }
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
index 4d4965d..049d38b 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
@@ -116,7 +116,7 @@ public class BackupManifest {
     private long startTs;
     private long completeTs;
     private ArrayList<BackupImage> ancestors;
-    private Map<TableName, Map<String, Long>> incrTimeRanges;
+    private HashMap<TableName, HashMap<String, Long>> incrTimeRanges;
 
     static Builder newBuilder() {
       return new Builder();
@@ -187,11 +187,11 @@ public class BackupManifest {
       return builder.build();
     }
 
-    private static Map<TableName, Map<String, Long>> loadIncrementalTimestampMap(
+    private static HashMap<TableName, HashMap<String, Long>> loadIncrementalTimestampMap(
         BackupProtos.BackupImage proto) {
       List<BackupProtos.TableServerTimestamp> list = proto.getTstMapList();
 
-      Map<TableName, Map<String, Long>> incrTimeRanges = new HashMap<>();
+      HashMap<TableName, HashMap<String, Long>> incrTimeRanges = new HashMap<>();
 
       if (list == null || list.size() == 0) {
         return incrTimeRanges;
@@ -199,7 +199,7 @@ public class BackupManifest {
 
       for (BackupProtos.TableServerTimestamp tst : list) {
         TableName tn = ProtobufUtil.toTableName(tst.getTableName());
-        Map<String, Long> map = incrTimeRanges.get(tn);
+        HashMap<String, Long> map = incrTimeRanges.get(tn);
         if (map == null) {
           map = new HashMap<>();
           incrTimeRanges.put(tn, map);
@@ -217,9 +217,9 @@ public class BackupManifest {
       if (this.incrTimeRanges == null) {
         return;
       }
-      for (Entry<TableName, Map<String, Long>> entry : this.incrTimeRanges.entrySet()) {
+      for (Entry<TableName, HashMap<String, Long>> entry : this.incrTimeRanges.entrySet()) {
         TableName key = entry.getKey();
-        Map<String, Long> value = entry.getValue();
+        HashMap<String, Long> value = entry.getValue();
         BackupProtos.TableServerTimestamp.Builder tstBuilder =
             BackupProtos.TableServerTimestamp.newBuilder();
         tstBuilder.setTableName(ProtobufUtil.toProtoTableName(key));
@@ -359,11 +359,11 @@ public class BackupManifest {
       return hash;
     }
 
-    public Map<TableName, Map<String, Long>> getIncrTimeRanges() {
+    public HashMap<TableName, HashMap<String, Long>> getIncrTimeRanges() {
       return incrTimeRanges;
     }
 
-    private void setIncrTimeRanges(Map<TableName, Map<String, Long>> incrTimeRanges) {
+    private void setIncrTimeRanges(HashMap<TableName, HashMap<String, Long>> incrTimeRanges) {
       this.incrTimeRanges = incrTimeRanges;
     }
   }
@@ -512,11 +512,11 @@ public class BackupManifest {
    * Set the incremental timestamp map directly.
    * @param incrTimestampMap timestamp map
    */
-  public void setIncrTimestampMap(Map<TableName, Map<String, Long>> incrTimestampMap) {
+  public void setIncrTimestampMap(HashMap<TableName, HashMap<String, Long>> incrTimestampMap) {
     this.backupImage.setIncrTimeRanges(incrTimestampMap);
   }
 
-  public Map<TableName, Map<String, Long>> getIncrTimestampMap() {
+  public Map<TableName, HashMap<String, Long>> getIncrTimestampMap() {
     return backupImage.getIncrTimeRanges();
   }
 
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 88093ba..3183ff4 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -34,9 +34,11 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -63,12 +65,14 @@ import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 
@@ -174,10 +178,11 @@ public final class BackupSystemTable implements Closeable {
   final static byte[] BL_PREPARE = Bytes.toBytes("R");
   final static byte[] BL_COMMIT = Bytes.toBytes("D");
 
+  private final static String WALS_PREFIX = "wals:";
   private final static String SET_KEY_PREFIX = "backupset:";
 
   // separator between BULK_LOAD_PREFIX and ordinals
-  private final static String BLK_LD_DELIM = ":";
+  protected final static String BLK_LD_DELIM = ":";
   private final static byte[] EMPTY_VALUE = new byte[] {};
 
   // Safe delimiter in a string
@@ -853,7 +858,7 @@ public final class BackupSystemTable implements Closeable {
    * @throws IOException exception
    */
   public void writeRegionServerLogTimestamp(Set<TableName> tables,
-      Map<String, Long> newTimestamps, String backupRoot) throws IOException {
+      HashMap<String, Long> newTimestamps, String backupRoot) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("write RS log time stamps to backup system table for tables ["
           + StringUtils.join(tables, ",") + "]");
@@ -878,13 +883,13 @@ public final class BackupSystemTable implements Closeable {
    *         RegionServer,PreviousTimeStamp
    * @throws IOException exception
    */
-  public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
+  public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot)
       throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
     }
 
-    Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
+    HashMap<TableName, HashMap<String, Long>> tableTimestampMap = new HashMap<>();
 
     Scan scan = createScanForReadLogTimestampMap(backupRoot);
     try (Table table = connection.getTable(tableName);
@@ -1007,6 +1012,148 @@ public final class BackupSystemTable implements Closeable {
   }
 
   /**
+   * Register WAL files as eligible for deletion
+   * @param files files
+   * @param backupId backup id
+   * @param backupRoot root directory path to backup destination
+   * @throws IOException exception
+   */
+  public void addWALFiles(List<String> files, String backupId, String backupRoot)
+      throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("add WAL files to backup system table: " + backupId + " " + backupRoot + " files ["
+          + StringUtils.join(files, ",") + "]");
+    }
+    if (LOG.isDebugEnabled()) {
+      files.forEach(file -> LOG.debug("add :" + file));
+    }
+    try (Table table = connection.getTable(tableName)) {
+      List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot);
+      table.put(puts);
+    }
+  }
+
+  /**
+   * Register WAL files as eligible for deletion
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException {
+    LOG.trace("get WAL files from backup system table");
+
+    final Table table = connection.getTable(tableName);
+    Scan scan = createScanForGetWALs(backupRoot);
+    final ResultScanner scanner = table.getScanner(scan);
+    final Iterator<Result> it = scanner.iterator();
+    return new Iterator<WALItem>() {
+
+      @Override
+      public boolean hasNext() {
+        boolean next = it.hasNext();
+        if (!next) {
+          // close all
+          try {
+            scanner.close();
+            table.close();
+          } catch (IOException e) {
+            LOG.error("Close WAL Iterator", e);
+          }
+        }
+        return next;
+      }
+
+      @Override
+      public WALItem next() {
+        Result next = it.next();
+        List<Cell> cells = next.listCells();
+        byte[] buf = cells.get(0).getValueArray();
+        int len = cells.get(0).getValueLength();
+        int offset = cells.get(0).getValueOffset();
+        String backupId = new String(buf, offset, len);
+        buf = cells.get(1).getValueArray();
+        len = cells.get(1).getValueLength();
+        offset = cells.get(1).getValueOffset();
+        String walFile = new String(buf, offset, len);
+        buf = cells.get(2).getValueArray();
+        len = cells.get(2).getValueLength();
+        offset = cells.get(2).getValueOffset();
+        String backupRoot = new String(buf, offset, len);
+        return new WALItem(backupId, walFile, backupRoot);
+      }
+
+      @Override
+      public void remove() {
+        // not implemented
+        throw new RuntimeException("remove is not supported");
+      }
+    };
+  }
+
+  /**
+   * Check if WAL file is eligible for deletion Future: to support all backup destinations
+   * @param file name of a file to check
+   * @return true, if deletable, false otherwise.
+   * @throws IOException exception
+   */
+  // TODO: multiple backup destination support
+  public boolean isWALFileDeletable(String file) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Check if WAL file has been already backed up in backup system table " + file);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Get get = createGetForCheckWALFile(file);
+      Result res = table.get(get);
+      return (!res.isEmpty());
+    }
+  }
+
+  /**
+   * Check if WAL file is eligible for deletion using multi-get
+   * @param files names of a file to check
+   * @return map of results (key: FileStatus object. value: true if the file is deletable, false
+   *         otherwise)
+   * @throws IOException exception
+   */
+  public Map<FileStatus, Boolean> areWALFilesDeletable(Iterable<FileStatus> files)
+      throws IOException {
+    final int BUF_SIZE = 100;
+
+    Map<FileStatus, Boolean> ret = new HashMap<>();
+    try (Table table = connection.getTable(tableName)) {
+      List<Get> getBuffer = new ArrayList<>();
+      List<FileStatus> fileStatuses = new ArrayList<>();
+
+      for (FileStatus file : files) {
+        String fn = file.getPath().getName();
+        if (fn.startsWith(WALProcedureStore.LOG_PREFIX)) {
+          ret.put(file, true);
+          continue;
+        }
+        String wal = file.getPath().toString();
+        Get get = createGetForCheckWALFile(wal);
+        getBuffer.add(get);
+        fileStatuses.add(file);
+        if (getBuffer.size() >= BUF_SIZE) {
+          Result[] results = table.get(getBuffer);
+          for (int i = 0; i < results.length; i++) {
+            ret.put(fileStatuses.get(i), !results[i].isEmpty());
+          }
+          getBuffer.clear();
+          fileStatuses.clear();
+        }
+      }
+
+      if (!getBuffer.isEmpty()) {
+        Result[] results = table.get(getBuffer);
+        for (int i = 0; i < results.length; i++) {
+          ret.put(fileStatuses.get(i), !results[i].isEmpty());
+        }
+      }
+    }
+    return ret;
+  }
+
+  /**
    * Checks if we have at least one backup session in backup system table This API is used by
    * BackupLogCleaner
    * @return true, if - at least one session exists in backup system table table
@@ -1506,7 +1653,7 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
-  private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
+  protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
     List<SnapshotDescription> list = admin.listSnapshots();
     for (SnapshotDescription desc : list) {
       if (desc.getName().equals(snapshotName)) {
@@ -1761,6 +1908,56 @@ public final class BackupSystemTable implements Closeable {
   }
 
   /**
+   * Creates put list for list of WAL files
+   * @param files list of WAL file paths
+   * @param backupId backup id
+   * @return put list
+   */
+  private List<Put> createPutsForAddWALFiles(List<String> files, String backupId,
+      String backupRoot) {
+    List<Put> puts = new ArrayList<>(files.size());
+    for (String file : files) {
+      Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
+      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("backupId"),
+        Bytes.toBytes(backupId));
+      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), Bytes.toBytes(file));
+      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"),
+        Bytes.toBytes(backupRoot));
+      puts.add(put);
+    }
+    return puts;
+  }
+
+  /**
+   * Creates Scan operation to load WALs
+   * @param backupRoot path to backup destination
+   * @return scan operation
+   */
+  private Scan createScanForGetWALs(String backupRoot) {
+    // TODO: support for backupRoot
+    Scan scan = new Scan();
+    byte[] startRow = Bytes.toBytes(WALS_PREFIX);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.withStartRow(startRow);
+    scan.withStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    return scan;
+  }
+
+  /**
+   * Creates Get operation for a given wal file name TODO: support for backup destination
+   * @param file file
+   * @return get operation
+   */
+  private Get createGetForCheckWALFile(String file) {
+    Get get = new Get(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
+    // add backup root column
+    get.addFamily(BackupSystemTable.META_FAMILY);
+    return get;
+  }
+
+  /**
    * Creates Scan operation to load backup set list
    * @return scan operation
    */
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index 6ad409e..5bf1373 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -27,6 +27,7 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CON
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.TableName;
@@ -159,6 +160,14 @@ public class FullTableBackupClient extends TableBackupClient {
         LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
 
       newTimestamps = backupManager.readRegionServerLastLogRollResult();
+      if (firstBackup) {
+        // Updates registered log files
+        // We record ALL old WAL files as registered, because
+        // this is a first full backup in the system and these
+        // files are not needed for next incremental backup
+        List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps);
+        backupManager.recordWALFiles(logFiles);
+      }
 
       // SNAPSHOT_TABLES:
       backupInfo.setPhase(BackupPhase.SNAPSHOT);
@@ -186,10 +195,9 @@ public class FullTableBackupClient extends TableBackupClient {
       // For incremental backup, it contains the incremental backup table set.
       backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
 
-      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
+      HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
           backupManager.readLogTimestampMap();
 
-      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
       Long newStartCode =
           BackupUtils.getMinValue(BackupUtils
               .getRSLogTimestampMins(newTableSetTimestampMap));
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
index 847837f..93d264a 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.hbase.backup.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.Set;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem;
 import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Admin;
@@ -60,16 +64,16 @@ public class IncrementalBackupManager extends BackupManager {
    * @return The new HashMap of RS log time stamps after the log roll for this incremental backup.
    * @throws IOException exception
    */
-  public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
+  public HashMap<String, Long> getIncrBackupLogFileMap() throws IOException {
     List<String> logList;
-    Map<String, Long> newTimestamps;
-    Map<String, Long> previousTimestampMins;
+    HashMap<String, Long> newTimestamps;
+    HashMap<String, Long> previousTimestampMins;
 
     String savedStartCode = readBackupStartCode();
 
     // key: tableName
     // value: <RegionServer,PreviousTimeStamp>
-    Map<TableName, Map<String, Long>> previousTimestampMap = readLogTimestampMap();
+    HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
 
     previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
 
@@ -95,19 +99,68 @@ public class IncrementalBackupManager extends BackupManager {
     newTimestamps = readRegionServerLastLogRollResult();
 
     logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
-    logList = excludeProcV2WALs(logList);
+    List<WALItem> logFromSystemTable =
+        getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
+            .getBackupRootDir());
+    logList = excludeAlreadyBackedUpAndProcV2WALs(logList, logFromSystemTable);
     backupInfo.setIncrBackupFileList(logList);
 
     return newTimestamps;
   }
 
-  private List<String> excludeProcV2WALs(List<String> logList) {
+  /**
+   * Get list of WAL files eligible for incremental backup.
+   *
+   * @return list of WAL files
+   * @throws IOException if getting the list of WAL files fails
+   */
+  public List<String> getIncrBackupLogFileList() throws IOException {
+    List<String> logList;
+    HashMap<String, Long> newTimestamps;
+    HashMap<String, Long> previousTimestampMins;
+
+    String savedStartCode = readBackupStartCode();
+
+    // key: tableName
+    // value: <RegionServer,PreviousTimeStamp>
+    HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
+
+    previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
+    }
+    // get all new log files from .logs and .oldlogs after last TS and before new timestamp
+    if (savedStartCode == null || previousTimestampMins == null
+        || previousTimestampMins.isEmpty()) {
+      throw new IOException(
+          "Cannot read any previous back up timestamps from backup system table. "
+              + "In order to create an incremental backup, at least one full backup is needed.");
+    }
+
+    newTimestamps = readRegionServerLastLogRollResult();
+
+    logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
+    List<WALItem> logFromSystemTable =
+        getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
+            .getBackupRootDir());
+
+    logList = excludeAlreadyBackedUpAndProcV2WALs(logList, logFromSystemTable);
+    backupInfo.setIncrBackupFileList(logList);
+
+    return logList;
+  }
+
+  private List<String> excludeAlreadyBackedUpAndProcV2WALs(List<String> logList,
+      List<WALItem> logFromSystemTable) {
+    Set<String> walFileNameSet = convertToSet(logFromSystemTable);
+
     List<String> list = new ArrayList<>();
     for (int i=0; i < logList.size(); i++) {
       Path p = new Path(logList.get(i));
       String name  = p.getName();
 
-      if (name.startsWith(WALProcedureStore.LOG_PREFIX)) {
+      if (walFileNameSet.contains(name) || name.startsWith(WALProcedureStore.LOG_PREFIX)) {
         continue;
       }
 
@@ -117,6 +170,65 @@ public class IncrementalBackupManager extends BackupManager {
   }
 
   /**
+   * Create Set of WAL file names (not full path names)
+   * @param logFromSystemTable the logs from the system table to convert
+   * @return set of WAL file names
+   */
+  private Set<String> convertToSet(List<WALItem> logFromSystemTable) {
+    Set<String> set = new HashSet<>();
+    for (int i=0; i < logFromSystemTable.size(); i++) {
+      WALItem item = logFromSystemTable.get(i);
+      set.add((new Path(item.walFile)).getName());
+    }
+    return set;
+  }
+
+  /**
+   * For each region server: get all log files newer than the last timestamps, but not newer than
+   * the newest timestamps.
+   * @param olderTimestamps timestamp map for each region server of the last backup.
+   * @param newestTimestamps timestamp map for each region server that the backup should lead to.
+   * @return list of log files which needs to be added to this backup
+   * @throws IOException if getting the WAL files from the backup system fails
+   */
+  private List<WALItem> getLogFilesFromBackupSystem(HashMap<String, Long> olderTimestamps,
+      HashMap<String, Long> newestTimestamps, String backupRoot) throws IOException {
+    List<WALItem> logFiles = new ArrayList<>();
+    Iterator<WALItem> it = getWALFilesFromBackupSystem();
+    while (it.hasNext()) {
+      WALItem item = it.next();
+      String rootDir = item.getBackupRoot();
+      if (!rootDir.equals(backupRoot)) {
+        continue;
+      }
+      String walFileName = item.getWalFile();
+      String server = BackupUtils.parseHostNameFromLogFile(new Path(walFileName));
+      if (server == null) {
+        continue;
+      }
+      Long tss = getTimestamp(walFileName);
+      Long oldTss = olderTimestamps.get(server);
+      Long newTss = newestTimestamps.get(server);
+      if (oldTss == null) {
+        logFiles.add(item);
+        continue;
+      }
+      if (newTss == null) {
+        newTss = Long.MAX_VALUE;
+      }
+      if (tss > oldTss && tss < newTss) {
+        logFiles.add(item);
+      }
+    }
+    return logFiles;
+  }
+
+  private Long getTimestamp(String walFileName) {
+    int index = walFileName.lastIndexOf(BackupUtils.LOGNAME_SEPARATOR);
+    return Long.parseLong(walFileName.substring(index + 1));
+  }
+
+  /**
    * For each region server: get all log files newer than the last timestamps but not newer than the
    * newest timestamps.
    * @param olderTimestamps the timestamp for each region server of the last backup.
@@ -126,8 +238,8 @@ public class IncrementalBackupManager extends BackupManager {
    * @return a list of log files to be backed up
    * @throws IOException exception
    */
-  private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
-      Map<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
+  private List<String> getLogFilesForNewBackup(HashMap<String, Long> olderTimestamps,
+      HashMap<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
       throws IOException {
     LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
         + "\n newestTimestamps: " + newestTimestamps);
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 918e99a..1fc428d 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -287,6 +288,8 @@ public class IncrementalTableBackupClient extends TableBackupClient {
       convertWALsToHFiles();
       incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()},
               backupInfo.getBackupRootDir());
+      // Save list of WAL files copied
+      backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
     } catch (Exception e) {
       String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
       // fail the overall backup and return
@@ -298,7 +301,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
     // After this checkpoint, even if entering cancel process, will let the backup finished
     try {
       // Set the previousTimestampMap which is before this current log roll to the manifest.
-      Map<TableName, Map<String, Long>> previousTimestampMap =
+      HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
           backupManager.readLogTimestampMap();
       backupInfo.setIncrTimestampMap(previousTimestampMap);
 
@@ -306,10 +309,9 @@ public class IncrementalTableBackupClient extends TableBackupClient {
       // For incremental backup, it contains the incremental backup table set.
       backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
 
-      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
+      HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
           backupManager.readLogTimestampMap();
 
-      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
       Long newStartCode =
           BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
       backupManager.writeBackupStartCode(newStartCode);
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
index 57f3a50..0213414 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -62,7 +61,7 @@ public abstract class TableBackupClient {
   protected Connection conn;
   protected String backupId;
   protected List<TableName> tableList;
-  protected Map<String, Long> newTimestamps = null;
+  protected HashMap<String, Long> newTimestamps = null;
 
   protected BackupManager backupManager;
   protected BackupInfo backupInfo;
@@ -295,7 +294,7 @@ public abstract class TableBackupClient {
 
       if (type == BackupType.INCREMENTAL) {
         // We'll store the log timestamps for this table only in its manifest.
-        Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
+        HashMap<TableName, HashMap<String, Long>> tableTimestampMap = new HashMap<>();
         tableTimestampMap.put(table, backupInfo.getIncrTimestampMap().get(table));
         manifest.setIncrTimestampMap(tableTimestampMap);
         ArrayList<BackupImage> ancestorss = backupManager.getAncestors(backupInfo);
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
index 79404b3..8d9400f 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
@@ -21,29 +21,26 @@ package org.apache.hadoop.hbase.backup.master;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
-import org.apache.hadoop.hbase.net.Address;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
+
 import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
 
 /**
@@ -79,77 +76,46 @@ public class BackupLogCleaner extends BaseLogCleanerDelegate {
     }
   }
 
-
-  private Map<Address, Long> getServersToOldestBackupMapping(List<BackupInfo> backups)
-    throws IOException {
-    Map<Address, Long> serverAddressToLastBackupMap = new HashMap<>();
-
-    Map<TableName, Long> tableNameBackupInfoMap = new HashMap<>();
-    for (BackupInfo backupInfo : backups) {
-      for (TableName table : backupInfo.getTables()) {
-        tableNameBackupInfoMap.putIfAbsent(table, backupInfo.getStartTs());
-        if (tableNameBackupInfoMap.get(table) <= backupInfo.getStartTs()) {
-          tableNameBackupInfoMap.put(table, backupInfo.getStartTs());
-          for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
-            .entrySet()) {
-            serverAddressToLastBackupMap.put(Address.fromString(entry.getKey()), entry.getValue());
-          }
-        }
-      }
-    }
-
-    return serverAddressToLastBackupMap;
-  }
-
   @Override
   public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
-    List<FileStatus> filteredFiles = new ArrayList<>();
-
     // all members of this class are null if backup is disabled,
     // so we cannot filter the files
     if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) {
       LOG.debug("Backup is not enabled. Check your {} setting",
-        BackupRestoreConstants.BACKUP_ENABLE_KEY);
+          BackupRestoreConstants.BACKUP_ENABLE_KEY);
       return files;
     }
 
-    Map<Address, Long> addressToLastBackupMap;
-    try {
-      try (BackupManager backupManager = new BackupManager(conn, getConf())) {
-        addressToLastBackupMap =
-          getServersToOldestBackupMapping(backupManager.getBackupHistory(true));
-      }
-    } catch (IOException ex) {
-      LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
-        ex.getMessage(), ex);
-      return Collections.emptyList();
-    }
-    for (FileStatus file : files) {
-      String fn = file.getPath().getName();
-      if (fn.startsWith(WALProcedureStore.LOG_PREFIX)) {
-        filteredFiles.add(file);
-        continue;
-      }
-
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      // If we do not have recorded backup sessions
       try {
-        Address walServerAddress =
-          Address.fromString(BackupUtils.parseHostNameFromLogFile(file.getPath()));
-        long walTimestamp = AbstractFSWALProvider.getTimestamp(file.getPath().getName());
-
-        if (!addressToLastBackupMap.containsKey(walServerAddress)
-          || addressToLastBackupMap.get(walServerAddress) >= walTimestamp) {
-          filteredFiles.add(file);
+        if (!table.hasBackupSessions()) {
+          LOG.trace("BackupLogCleaner has no backup sessions");
+          return files;
+        }
+      } catch (TableNotFoundException tnfe) {
+        LOG.warn("Backup system table is not available: {}", tnfe.getMessage());
+        return files;
+      }
+      List<FileStatus> list = new ArrayList<>();
+      Map<FileStatus, Boolean> walFilesDeletableMap = table.areWALFilesDeletable(files);
+      for (Map.Entry<FileStatus, Boolean> entry: walFilesDeletableMap.entrySet()) {
+        FileStatus file = entry.getKey();
+        String wal = file.getPath().toString();
+        boolean deletable = entry.getValue();
+        if (deletable) {
+          LOG.debug("Found log file in backup system table, deleting: {}", wal);
+          list.add(file);
+        } else {
+          LOG.debug("Did not find this log in backup system table, keeping: {}", wal);
         }
-      } catch (Exception ex) {
-        LOG.warn(
-          "Error occurred while filtering file: {} with error: {}. Ignoring cleanup of this log",
-          file.getPath(), ex.getMessage());
       }
+      return list;
+    } catch (IOException e) {
+      LOG.error("Failed to get backup system table table, therefore will keep all files", e);
+      // nothing to delete
+      return Collections.emptyList();
     }
-
-    LOG
-      .info("Total files: {}, Filtered Files: {}", IterableUtils.size(files), filteredFiles.size());
-    return filteredFiles;
   }
 
   @Override
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 90bb4fd..2f81bfc 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -27,7 +27,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -83,8 +82,8 @@ public final class BackupUtils {
    * @param rsLogTimestampMap timestamp map
    * @return the min timestamp of each RS
    */
-  public static Map<String, Long> getRSLogTimestampMins(
-      Map<TableName, Map<String, Long>> rsLogTimestampMap) {
+  public static HashMap<String, Long> getRSLogTimestampMins(
+      HashMap<TableName, HashMap<String, Long>> rsLogTimestampMap) {
     if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) {
       return null;
     }
@@ -92,14 +91,18 @@ public final class BackupUtils {
     HashMap<String, Long> rsLogTimestampMins = new HashMap<>();
     HashMap<String, HashMap<TableName, Long>> rsLogTimestampMapByRS = new HashMap<>();
 
-    for (Entry<TableName, Map<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) {
+    for (Entry<TableName, HashMap<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) {
       TableName table = tableEntry.getKey();
-      Map<String, Long> rsLogTimestamp = tableEntry.getValue();
+      HashMap<String, Long> rsLogTimestamp = tableEntry.getValue();
       for (Entry<String, Long> rsEntry : rsLogTimestamp.entrySet()) {
         String rs = rsEntry.getKey();
         Long ts = rsEntry.getValue();
-        rsLogTimestampMapByRS.putIfAbsent(rs, new HashMap<>());
-        rsLogTimestampMapByRS.get(rs).put(table, ts);
+        if (!rsLogTimestampMapByRS.containsKey(rs)) {
+          rsLogTimestampMapByRS.put(rs, new HashMap<>());
+          rsLogTimestampMapByRS.get(rs).put(table, ts);
+        } else {
+          rsLogTimestampMapByRS.get(rs).put(table, ts);
+        }
       }
     }
 
@@ -346,7 +349,7 @@ public final class BackupUtils {
    * @param map map
    * @return the min value
    */
-  public static <T> Long getMinValue(Map<T, Long> map) {
+  public static <T> Long getMinValue(HashMap<T, Long> map) {
     Long minTimestamp = null;
     if (map != null) {
       ArrayList<Long> timestampList = new ArrayList<>(map.values());
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 8a06425..80ffe55 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -94,6 +92,7 @@ public class TestBackupBase {
   protected static TableName table1_restore = TableName.valueOf("default:table1");
   protected static TableName table2_restore = TableName.valueOf("ns2:table2");
   protected static TableName table3_restore = TableName.valueOf("ns3:table3_restore");
+  protected static TableName table4_restore = TableName.valueOf("ns4:table4_restore");
 
   protected static final int NB_ROWS_IN_BATCH = 99;
   protected static final byte[] qualName = Bytes.toBytes("q1");
@@ -136,12 +135,14 @@ public class TestBackupBase {
         incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()},
           backupInfo.getBackupRootDir());
         failStageIf(Stage.stage_2);
+        // Save list of WAL files copied
+        backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
 
         // case INCR_BACKUP_COMPLETE:
         // set overall backup status: complete. Here we make sure to complete the backup.
         // After this checkpoint, even if entering cancel process, will let the backup finished
         // Set the previousTimestampMap which is before this current log roll to the manifest.
-        Map<TableName, Map<String, Long>> previousTimestampMap =
+        HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
             backupManager.readLogTimestampMap();
         backupInfo.setIncrTimestampMap(previousTimestampMap);
 
@@ -150,7 +151,7 @@ public class TestBackupBase {
         backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
         failStageIf(Stage.stage_3);
 
-        Map<TableName, Map<String, Long>> newTableSetTimestampMap =
+        HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
             backupManager.readLogTimestampMap();
 
         Long newStartCode =
@@ -211,6 +212,14 @@ public class TestBackupBase {
           LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
         failStageIf(Stage.stage_2);
         newTimestamps = backupManager.readRegionServerLastLogRollResult();
+        if (firstBackup) {
+          // Updates registered log files
+          // We record ALL old WAL files as registered, because
+          // this is a first full backup in the system and these
+          // files are not needed for next incremental backup
+          List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps);
+          backupManager.recordWALFiles(logFiles);
+        }
 
         // SNAPSHOT_TABLES:
         backupInfo.setPhase(BackupPhase.SNAPSHOT);
@@ -238,7 +247,7 @@ public class TestBackupBase {
         // For incremental backup, it contains the incremental backup table set.
         backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
 
-        Map<TableName, Map<String, Long>> newTableSetTimestampMap =
+        HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
             backupManager.readLogTimestampMap();
 
         Long newStartCode =
@@ -421,10 +430,15 @@ public class TestBackupBase {
     Admin ha = TEST_UTIL.getAdmin();
 
     // Create namespaces
-    ha.createNamespace(NamespaceDescriptor.create("ns1").build());
-    ha.createNamespace(NamespaceDescriptor.create("ns2").build());
-    ha.createNamespace(NamespaceDescriptor.create("ns3").build());
-    ha.createNamespace(NamespaceDescriptor.create("ns4").build());
+    NamespaceDescriptor desc1 = NamespaceDescriptor.create("ns1").build();
+    NamespaceDescriptor desc2 = NamespaceDescriptor.create("ns2").build();
+    NamespaceDescriptor desc3 = NamespaceDescriptor.create("ns3").build();
+    NamespaceDescriptor desc4 = NamespaceDescriptor.create("ns4").build();
+
+    ha.createNamespace(desc1);
+    ha.createNamespace(desc2);
+    ha.createNamespace(desc3);
+    ha.createNamespace(desc4);
 
     TableDescriptor desc = TableDescriptorBuilder.newBuilder(table1)
       .setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build();
@@ -493,21 +507,6 @@ public class TestBackupBase {
     return ret;
   }
 
-  protected List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
-    Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
-    FileSystem fs = logRoot.getFileSystem(c);
-    RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
-    List<FileStatus> logFiles = new ArrayList<FileStatus>();
-    while (it.hasNext()) {
-      LocatedFileStatus lfs = it.next();
-      if (lfs.isFile() && !AbstractFSWALProvider.isMetaFile(lfs.getPath())) {
-        logFiles.add(lfs);
-        LOG.info(Objects.toString(lfs));
-      }
-    }
-    return logFiles;
-  }
-
   protected void dumpBackupDir() throws IOException {
     // Dump Backup Dir
     FileSystem fs = FileSystem.get(conf1);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java
index e5a6679..b01a136 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java
@@ -18,9 +18,11 @@
 package org.apache.hadoop.hbase.backup;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,6 +35,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
@@ -266,12 +270,12 @@ public class TestBackupSystemTable {
 
     table.writeRegionServerLogTimestamp(tables, rsTimestampMap, "root");
 
-    Map<TableName, Map<String, Long>> result = table.readLogTimestampMap("root");
+    HashMap<TableName, HashMap<String, Long>> result = table.readLogTimestampMap("root");
 
     assertTrue(tables.size() == result.size());
 
     for (TableName t : tables) {
-      Map<String, Long> rstm = result.get(t);
+      HashMap<String, Long> rstm = result.get(t);
       assertNotNull(rstm);
       assertEquals(rstm.get("rs1:100"), new Long(100L));
       assertEquals(rstm.get("rs2:100"), new Long(101L));
@@ -297,7 +301,7 @@ public class TestBackupSystemTable {
     assertTrue(5 == result.size());
 
     for (TableName t : tables) {
-      Map<String, Long> rstm = result.get(t);
+      HashMap<String, Long> rstm = result.get(t);
       assertNotNull(rstm);
       if (t.equals(TableName.valueOf("t3")) == false) {
         assertEquals(rstm.get("rs1:100"), new Long(100L));
@@ -311,7 +315,7 @@ public class TestBackupSystemTable {
     }
 
     for (TableName t : tables1) {
-      Map<String, Long> rstm = result.get(t);
+      HashMap<String, Long> rstm = result.get(t);
       assertNotNull(rstm);
       assertEquals(rstm.get("rs1:100"), new Long(200L));
       assertEquals(rstm.get("rs2:100"), new Long(201L));
@@ -322,6 +326,43 @@ public class TestBackupSystemTable {
 
   }
 
+  @Test
+  public void testAddWALFiles() throws IOException {
+    List<String> files =
+        Arrays.asList("hdfs://server/WALs/srv1,101,15555/srv1,101,15555.default.1",
+          "hdfs://server/WALs/srv2,102,16666/srv2,102,16666.default.2",
+          "hdfs://server/WALs/srv3,103,17777/srv3,103,17777.default.3");
+    String newFile = "hdfs://server/WALs/srv1,101,15555/srv1,101,15555.default.5";
+
+    table.addWALFiles(files, "backup", "root");
+
+    assertTrue(table.isWALFileDeletable(files.get(0)));
+    assertTrue(table.isWALFileDeletable(files.get(1)));
+    assertTrue(table.isWALFileDeletable(files.get(2)));
+    assertFalse(table.isWALFileDeletable(newFile));
+
+    // test for isWALFilesDeletable
+    List<FileStatus> fileStatues = new ArrayList<>();
+    for (String file : files) {
+      FileStatus fileStatus = new FileStatus();
+      fileStatus.setPath(new Path(file));
+      fileStatues.add(fileStatus);
+    }
+
+    FileStatus newFileStatus = new FileStatus();
+    newFileStatus.setPath(new Path(newFile));
+    fileStatues.add(newFileStatus);
+
+    Map<FileStatus, Boolean> walFilesDeletableMap = table.areWALFilesDeletable(fileStatues);
+
+    assertTrue(walFilesDeletableMap.get(fileStatues.get(0)));
+    assertTrue(walFilesDeletableMap.get(fileStatues.get(1)));
+    assertTrue(walFilesDeletableMap.get(fileStatues.get(2)));
+    assertFalse(walFilesDeletableMap.get(newFileStatus));
+
+    cleanBackupTable();
+  }
+
   /**
    * Backup set tests
    */
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
index 5363b1a..6b8011e 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
@@ -19,11 +19,19 @@ package org.apache.hadoop.hbase.backup.master;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import java.util.HashMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.TestBackupBase;
@@ -32,14 +40,16 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
@@ -54,7 +64,6 @@ public class TestBackupLogCleaner extends TestBackupBase {
 
   // implements all test cases in 1 test since incremental full backup/
   // incremental backup has dependencies
-
   @Test
   public void testBackupLogCleaner() throws Exception {
 
@@ -68,11 +77,10 @@ public class TestBackupLogCleaner extends TestBackupBase {
       assertFalse(systemTable.hasBackupSessions());
 
       List<FileStatus> walFiles = getListOfWALFiles(TEST_UTIL.getConfiguration());
+      List<String> swalFiles = convert(walFiles);
       BackupLogCleaner cleaner = new BackupLogCleaner();
       cleaner.setConf(TEST_UTIL.getConfiguration());
-      Map<String, Object> params = new HashMap<>();
-      params.put(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster());
-      cleaner.init(params);
+      cleaner.init(null);
       cleaner.setConf(TEST_UTIL.getConfiguration());
 
       Iterable<FileStatus> deletable = cleaner.getDeletableFiles(walFiles);
@@ -81,6 +89,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
       // We can delete all files because we do not have yet recorded backup sessions
       assertTrue(size == walFiles.size());
 
+      systemTable.addWALFiles(swalFiles, "backup", "root");
       String backupIdFull = fullTableBackup(tableSetFullList);
       assertTrue(checkSucceeded(backupIdFull));
       // Check one more time
@@ -91,6 +100,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
 
       List<FileStatus> newWalFiles = getListOfWALFiles(TEST_UTIL.getConfiguration());
       LOG.debug("WAL list after full backup");
+      convert(newWalFiles);
 
       // New list of wal files is greater than the previous one,
       // because new wal per RS have been opened after full backup
@@ -130,4 +140,29 @@ public class TestBackupLogCleaner extends TestBackupBase {
       conn.close();
     }
   }
+
+  private List<String> convert(List<FileStatus> walFiles) {
+    List<String> result = new ArrayList<String>();
+    for (FileStatus fs : walFiles) {
+      LOG.debug("+++WAL: " + fs.getPath().toString());
+      result.add(fs.getPath().toString());
+    }
+    return result;
+  }
+
+  private List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
+    Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
+    FileSystem fs = logRoot.getFileSystem(c);
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
+    List<FileStatus> logFiles = new ArrayList<FileStatus>();
+    while (it.hasNext()) {
+      LocatedFileStatus lfs = it.next();
+      if (lfs.isFile() && !AbstractFSWALProvider.isMetaFile(lfs.getPath())) {
+        logFiles.add(lfs);
+        LOG.info(Objects.toString(lfs));
+      }
+    }
+    return logFiles;
+  }
+
 }
diff --git a/hbase-protocol-shaded/src/main/protobuf/Backup.proto b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
index afe4312..6084e17 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Backup.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
@@ -91,11 +91,7 @@ message BackupInfo {
   optional uint32 progress = 10;
   optional uint32 workers_number = 11;
   optional uint64 bandwidth = 12;
-  map<string, RSTimestampMap> table_set_timestamp = 13;
 
-  message RSTimestampMap {
-    map<string, uint64> rs_timestamp = 1;
-  }
   /**
    * Backup session states
    */