You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/14 04:11:52 UTC

hbase git commit: HBASE-20117 Cleanup the unused replication barriers in meta table

Repository: hbase
Updated Branches:
  refs/heads/master b16e03c13 -> b7308ee01


HBASE-20117 Cleanup the unused replication barriers in meta table


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b7308ee0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b7308ee0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b7308ee0

Branch: refs/heads/master
Commit: b7308ee01c653051db4060ba28dadb5bcc74b7de
Parents: b16e03c
Author: zhangduo <zh...@apache.org>
Authored: Tue Mar 13 21:36:06 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Mar 14 12:08:15 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/MetaTableAccessor.java  |   2 +-
 .../hbase/replication/ReplicationUtils.java     |  38 +++
 .../org/apache/hadoop/hbase/master/HMaster.java |  91 +++---
 .../cleaner/ReplicationBarrierCleaner.java      | 162 ++++++++++
 .../replication/ReplicationPeerManager.java     |  10 +
 .../NamespaceTableCfWALEntryFilter.java         |  39 +--
 .../cleaner/TestReplicationBarrierCleaner.java  | 293 +++++++++++++++++++
 .../TestSerialReplicationChecker.java           |   2 +-
 8 files changed, 556 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b7308ee0/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 5f1a8b7..57a0350 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -2056,7 +2056,7 @@ public class MetaTableAccessor {
     return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength());
   }
 
-  private static long[] getReplicationBarriers(Result result) {
+  public static long[] getReplicationBarriers(Result result) {
     return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
       .stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7308ee0/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 2a6870a..e2479e0 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -135,4 +135,42 @@ public final class ReplicationUtils {
     return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
   }
+
+  /**
+   * Returns whether we should replicate the given table.
+   */
+  public static boolean contains(ReplicationPeerConfig peerConfig, TableName tableName) {
+    String namespace = tableName.getNamespaceAsString();
+    if (peerConfig.replicateAllUserTables()) {
+      // replicate all user tables, but filter by exclude namespaces and table-cfs config
+      Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
+      if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
+        return false;
+      }
+      Map<TableName, List<String>> excludedTableCFs = peerConfig.getTableCFsMap();
+      // trap here, must check existence first since HashMap allows null value.
+      if (excludedTableCFs == null || !excludedTableCFs.containsKey(tableName)) {
+        return true;
+      }
+      List<String> cfs = excludedTableCFs.get(tableName);
+      // if cfs is null or empty then we can make sure that we do not need to replicate this table,
+      // otherwise, we may still need to replicate the table but filter out some families.
+      return cfs != null && !cfs.isEmpty();
+    } else {
+      // Not replicate all user tables, so filter by namespaces and table-cfs config
+      Set<String> namespaces = peerConfig.getNamespaces();
+      Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
+
+      if (namespaces == null && tableCFs == null) {
+        return false;
+      }
+
+      // First filter by namespaces config
+      // If table's namespace in peer config, all the tables data are applicable for replication
+      if (namespaces != null && namespaces.contains(namespace)) {
+        return true;
+      }
+      return tableCFs != null && tableCFs.containsKey(tableName);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7308ee0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 1ae85fe..81dd667 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.ServerMetricsBuilder;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
@@ -108,6 +109,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
 import org.apache.hadoop.hbase.master.locking.LockManager;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
@@ -364,6 +366,7 @@ public class HMaster extends HRegionServer implements MasterServices {
   CatalogJanitor catalogJanitorChore;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
+  private ReplicationBarrierCleaner replicationBarrierCleaner;
   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
   private MobCompactionChore mobCompactChore;
   private MasterMobCompactionThread mobCompactThread;
@@ -1151,19 +1154,30 @@ public class HMaster extends HRegionServer implements MasterServices {
          getMasterWalManager().getOldLogDir());
     getChoreService().scheduleChore(logCleaner);
 
-   //start the hfile archive cleaner thread
+    // start the hfile archive cleaner thread
     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
     Map<String, Object> params = new HashMap<>();
     params.put(MASTER, this);
     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
         .getFileSystem(), archiveDir, params);
     getChoreService().scheduleChore(hfileCleaner);
+
+    replicationBarrierCleaner =
+      new ReplicationBarrierCleaner(conf, this, getConnection(), replicationPeerManager);
+    getChoreService().scheduleChore(replicationBarrierCleaner);
+
     serviceStarted = true;
     if (LOG.isTraceEnabled()) {
       LOG.trace("Started service threads");
     }
   }
 
+  private void cancelChore(ScheduledChore chore) {
+    if (chore != null) {
+      chore.cancel();
+    }
+  }
+
   @Override
   protected void stopServiceThreads() {
     if (masterJettyServer != null) {
@@ -1177,24 +1191,33 @@ public class HMaster extends HRegionServer implements MasterServices {
     super.stopServiceThreads();
     stopChores();
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Stopping service threads");
-    }
+    LOG.debug("Stopping service threads");
 
-    // Clean up and close up shop
-    if (this.logCleaner != null) this.logCleaner.cancel(true);
-    if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
-    if (this.quotaManager != null) this.quotaManager.stop();
+    if (this.quotaManager != null) {
+      this.quotaManager.stop();
+    }
 
-    if (this.activeMasterManager != null) this.activeMasterManager.stop();
-    if (this.serverManager != null) this.serverManager.stop();
-    if (this.assignmentManager != null) this.assignmentManager.stop();
+    if (this.activeMasterManager != null) {
+      this.activeMasterManager.stop();
+    }
+    if (this.serverManager != null) {
+      this.serverManager.stop();
+    }
+    if (this.assignmentManager != null) {
+      this.assignmentManager.stop();
+    }
 
     stopProcedureExecutor();
 
-    if (this.walManager != null) this.walManager.stop();
-    if (this.fileSystemManager != null) this.fileSystemManager.stop();
-    if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
+    if (this.walManager != null) {
+      this.walManager.stop();
+    }
+    if (this.fileSystemManager != null) {
+      this.fileSystemManager.stop();
+    }
+    if (this.mpmHost != null) {
+      this.mpmHost.stop("server shutting down.");
+    }
   }
 
   private void startProcedureExecutor() throws IOException {
@@ -1233,37 +1256,21 @@ public class HMaster extends HRegionServer implements MasterServices {
   }
 
   private void stopChores() {
-    if (this.expiredMobFileCleanerChore != null) {
-      this.expiredMobFileCleanerChore.cancel(true);
-    }
-    if (this.mobCompactChore != null) {
-      this.mobCompactChore.cancel(true);
-    }
-    if (this.balancerChore != null) {
-      this.balancerChore.cancel(true);
-    }
-    if (this.normalizerChore != null) {
-      this.normalizerChore.cancel(true);
-    }
-    if (this.clusterStatusChore != null) {
-      this.clusterStatusChore.cancel(true);
-    }
-    if (this.catalogJanitorChore != null) {
-      this.catalogJanitorChore.cancel(true);
-    }
-    if (this.clusterStatusPublisherChore != null){
-      clusterStatusPublisherChore.cancel(true);
-    }
+    cancelChore(this.expiredMobFileCleanerChore);
+    cancelChore(this.mobCompactChore);
+    cancelChore(this.balancerChore);
+    cancelChore(this.normalizerChore);
+    cancelChore(this.clusterStatusChore);
+    cancelChore(this.catalogJanitorChore);
+    cancelChore(this.clusterStatusPublisherChore);
     if (this.mobCompactThread != null) {
       this.mobCompactThread.close();
     }
-
-    if (this.quotaObserverChore != null) {
-      quotaObserverChore.cancel();
-    }
-    if (this.snapshotQuotaChore != null) {
-      snapshotQuotaChore.cancel();
-    }
+    cancelChore(this.clusterStatusPublisherChore);
+    cancelChore(this.snapshotQuotaChore);
+    cancelChore(this.logCleaner);
+    cancelChore(this.hfileCleaner);
+    cancelChore(this.replicationBarrierCleaner);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7308ee0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java
new file mode 100644
index 0000000..16b8fc5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to clean the useless barriers in {@link HConstants#REPLICATION_BARRIER_FAMILY_STR} family in
+ * meta table.
+ */
+@InterfaceAudience.Private
+public class ReplicationBarrierCleaner extends ScheduledChore {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationBarrierCleaner.class);
+
+  private static final String REPLICATION_BARRIER_CLEANER_INTERVAL =
+    "hbase.master.cleaner.replication.barrier.interval";
+
+  // 12 hour. Usually regions will not be moved so the barrier are rarely updated. Use a large
+  // interval.
+  private static final int DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL = 12 * 60 * 60 * 1000;
+
+  private final Connection conn;
+
+  private final ReplicationPeerManager peerManager;
+
+  public ReplicationBarrierCleaner(Configuration conf, Stoppable stopper, Connection conn,
+      ReplicationPeerManager peerManager) {
+    super("ReplicationBarrierCleaner", stopper, conf.getInt(REPLICATION_BARRIER_CLEANER_INTERVAL,
+      DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL));
+    this.conn = conn;
+    this.peerManager = peerManager;
+  }
+
+  @Override
+  protected void chore() {
+    long totalRows = 0;
+    long cleanedRows = 0;
+    long deletedRows = 0;
+    long deletedBarriers = 0;
+    TableName tableName = null;
+    List<String> peerIds = null;
+    try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME);
+        ResultScanner scanner = metaTable.getScanner(
+          new Scan().addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions())) {
+      for (;;) {
+        Result result = scanner.next();
+        if (result == null) {
+          break;
+        }
+        totalRows++;
+        long[] barriers = MetaTableAccessor.getReplicationBarriers(result);
+        if (barriers.length == 0) {
+          continue;
+        }
+        byte[] regionName = result.getRow();
+        TableName tn = RegionInfo.getTable(regionName);
+        if (!tn.equals(tableName)) {
+          tableName = tn;
+          peerIds = peerManager.getSerialPeerIdsBelongsTo(tableName);
+        }
+        if (peerIds.isEmpty()) {
+          // no serial replication, only keep the newest barrier
+          Cell cell = result.getColumnLatestCell(HConstants.REPLICATION_BARRIER_FAMILY,
+            HConstants.SEQNUM_QUALIFIER);
+          metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
+            cell.getTimestamp() - 1));
+          cleanedRows++;
+          deletedBarriers += barriers.length - 1;
+          continue;
+        }
+        String encodedRegionName = RegionInfo.encodeRegionName(regionName);
+        long pushedSeqId = Long.MAX_VALUE;
+        for (String peerId : peerIds) {
+          pushedSeqId = Math.min(pushedSeqId,
+            peerManager.getQueueStorage().getLastSequenceId(encodedRegionName, peerId));
+        }
+        int index = Arrays.binarySearch(barriers, pushedSeqId);
+        if (index == -1) {
+          // beyond the first barrier, usually this should not happen but anyway let's add a check
+          // for it.
+          continue;
+        }
+        if (index < 0) {
+          index = -index - 1;
+        } else {
+          index++;
+        }
+        // A special case for merged/split region, where we are in the last closed range and the
+        // pushedSeqId is the last barrier minus 1.
+        if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) {
+          // check if the region has already been removed, i.e, no catalog family
+          if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) {
+            metaTable
+              .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
+            deletedRows++;
+            deletedBarriers += barriers.length;
+            continue;
+          }
+        }
+        // the barrier before 'index - 1'(exclusive) can be safely removed. See the algorithm in
+        // SerialReplicationChecker for more details.
+        if (index - 1 > 0) {
+          List<Cell> cells = result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY,
+            HConstants.SEQNUM_QUALIFIER);
+          // All barriers before this cell(exclusive) can be removed
+          Cell cell = cells.get(cells.size() - index);
+          metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
+            cell.getTimestamp() - 1));
+          cleanedRows++;
+          deletedBarriers += index - 1;
+        }
+      }
+    } catch (ReplicationException | IOException e) {
+      LOG.warn("Failed to clean up replication barrier", e);
+    }
+    if (totalRows > 0) {
+      LOG.info(
+        "Cleanup replication barriers: " +
+          "totalRows {}, cleanedRows {}, deletedRows {}, deletedBarriers {}",
+        totalRows, cleanedRows, deletedRows, deletedBarriers);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7308ee0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 19fc7f4..7620638 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -327,6 +327,16 @@ public class ReplicationPeerManager {
     }
   }
 
+  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
+    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
+      .filter(p -> ReplicationUtils.contains(p.getPeerConfig(), tableName)).map(p -> p.getPeerId())
+      .collect(Collectors.toList());
+  }
+
+  public ReplicationQueueStorage getQueueStorage() {
+    return queueStorage;
+  }
+
   public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
       throws ReplicationException {
     ReplicationPeerStorage peerStorage =

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7308ee0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index 08c9f37..3a3200a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
@@ -53,44 +52,10 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
 
   @Override
   public Entry filter(Entry entry) {
-    TableName tabName = entry.getKey().getTableName();
-    String namespace = tabName.getNamespaceAsString();
-    ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
-
-    if (peerConfig.replicateAllUserTables()) {
-      // replicate all user tables, but filter by exclude namespaces config
-      Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
-
-      // return null(prevent replicating) if logKey's table is in this peer's
-      // exclude namespaces list
-      if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
-        return null;
-      }
-
+    if (ReplicationUtils.contains(this.peer.getPeerConfig(), entry.getKey().getTableName())) {
       return entry;
     } else {
-      // Not replicate all user tables, so filter by namespaces and table-cfs config
-      Set<String> namespaces = peerConfig.getNamespaces();
-      Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
-
-      if (namespaces == null && tableCFs == null) {
-        return null;
-      }
-
-      // First filter by namespaces config
-      // If table's namespace in peer config, all the tables data are applicable for replication
-      if (namespaces != null && namespaces.contains(namespace)) {
-        return entry;
-      }
-
-      // Then filter by table-cfs config
-      // return null(prevent replicating) if logKey's table isn't in this peer's
-      // replicable tables list
-      if (tableCFs == null || !tableCFs.containsKey(tabName)) {
-        return null;
-      }
-
-      return entry;
+      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7308ee0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java
new file mode 100644
index 0000000..671bc22
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReplicationBarrierCleaner {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationBarrierCleaner.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestHFileCleaner.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @Rule
+  public final TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
+        ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY)
+          .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) {
+      for (;;) {
+        Result result = scanner.next();
+        if (result == null) {
+          break;
+        }
+        TableName tableName = RegionInfo.getTable(result.getRow());
+        if (!tableName.isSystemTable()) {
+          table.delete(new Delete(result.getRow()));
+        }
+      }
+    }
+  }
+
+  private ReplicationPeerManager create(ReplicationQueueStorage queueStorage,
+      List<String> firstPeerIds, @SuppressWarnings("unchecked") List<String>... peerIds) {
+    ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
+    if (queueStorage != null) {
+      when(peerManager.getQueueStorage()).thenReturn(queueStorage);
+    }
+    if (peerIds.length == 0) {
+      when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds);
+    } else {
+      when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds,
+        peerIds);
+    }
+    return peerManager;
+  }
+
+  private ReplicationQueueStorage create(Long lastPushedSeqId, Long... lastPushedSeqIds)
+      throws ReplicationException {
+    ReplicationQueueStorage queueStorage = mock(ReplicationQueueStorage.class);
+    if (lastPushedSeqIds.length == 0) {
+      when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId);
+    } else {
+      when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId,
+        lastPushedSeqIds);
+    }
+    return queueStorage;
+  }
+
+  private ReplicationBarrierCleaner create(ReplicationPeerManager peerManager) throws IOException {
+    return new ReplicationBarrierCleaner(UTIL.getConfiguration(), new WarnOnlyStoppable(),
+      UTIL.getConnection(), peerManager);
+  }
+
+  private void addBarrier(RegionInfo region, long... barriers) throws IOException {
+    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
+    for (int i = 0; i < barriers.length; i++) {
+      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
+        put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
+    }
+    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
+  private void fillCatalogFamily(RegionInfo region) throws IOException {
+    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      table.put(new Put(region.getRegionName()).addColumn(HConstants.CATALOG_FAMILY,
+        Bytes.toBytes("whatever"), Bytes.toBytes("whatever")));
+    }
+  }
+
+  private void clearCatalogFamily(RegionInfo region) throws IOException {
+    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      table.delete(new Delete(region.getRegionName()).addFamily(HConstants.CATALOG_FAMILY));
+    }
+  }
+
+  @Test
+  public void testNothing() throws IOException {
+    ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
+    ReplicationBarrierCleaner cleaner = create(peerManager);
+    cleaner.chore();
+    verify(peerManager, never()).getSerialPeerIdsBelongsTo(any(TableName.class));
+    verify(peerManager, never()).getQueueStorage();
+  }
+
+  @Test
+  public void testCleanNoPeers() throws IOException {
+    TableName tableName1 = TableName.valueOf(name.getMethodName() + "_1");
+    RegionInfo region11 =
+      RegionInfoBuilder.newBuilder(tableName1).setEndKey(Bytes.toBytes(1)).build();
+    addBarrier(region11, 10, 20, 30, 40, 50, 60);
+    RegionInfo region12 =
+      RegionInfoBuilder.newBuilder(tableName1).setStartKey(Bytes.toBytes(1)).build();
+    addBarrier(region12, 20, 30, 40, 50, 60, 70);
+
+    TableName tableName2 = TableName.valueOf(name.getMethodName() + "_2");
+    RegionInfo region21 =
+      RegionInfoBuilder.newBuilder(tableName2).setEndKey(Bytes.toBytes(1)).build();
+    addBarrier(region21, 100, 200, 300, 400);
+    RegionInfo region22 =
+      RegionInfoBuilder.newBuilder(tableName2).setStartKey(Bytes.toBytes(1)).build();
+    addBarrier(region22, 200, 300, 400, 500, 600);
+
+    @SuppressWarnings("unchecked")
+    ReplicationPeerManager peerManager =
+      create(null, Collections.emptyList(), Collections.emptyList());
+    ReplicationBarrierCleaner cleaner = create(peerManager);
+    cleaner.chore();
+
+    // should never call this method
+    verify(peerManager, never()).getQueueStorage();
+    // should only be called twice although we have 4 regions to clean
+    verify(peerManager, times(2)).getSerialPeerIdsBelongsTo(any(TableName.class));
+
+    assertArrayEquals(new long[] { 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region11.getRegionName()));
+    assertArrayEquals(new long[] { 70 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region12.getRegionName()));
+
+    assertArrayEquals(new long[] { 400 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region21.getRegionName()));
+    assertArrayEquals(new long[] { 600 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region22.getRegionName()));
+  }
+
+  @Test
+  public void testDeleteBarriers() throws IOException, ReplicationException {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+    addBarrier(region, 10, 20, 30, 40, 50, 60);
+    // two peers
+    ReplicationQueueStorage queueStorage = create(-1L, 2L, 15L, 25L, 20L, 25L, 65L, 55L, 70L, 70L);
+    List<String> peerIds = Lists.newArrayList("1", "2");
+
+    @SuppressWarnings("unchecked")
+    ReplicationPeerManager peerManager =
+      create(queueStorage, peerIds, peerIds, peerIds, peerIds, peerIds);
+    ReplicationBarrierCleaner cleaner = create(peerManager);
+
+    // beyond the first barrier, no deletion
+    cleaner.chore();
+    assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+
+    // in the first range, still no deletion
+    cleaner.chore();
+    assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+
+    // in the second range, 10 is deleted
+    cleaner.chore();
+    assertArrayEquals(new long[] { 20, 30, 40, 50, 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+
+    // between 50 and 60, so the barriers before 50 will be deleted
+    cleaner.chore();
+    assertArrayEquals(new long[] { 50, 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+
+    // in the last open range, 50 is deleted
+    cleaner.chore();
+    assertArrayEquals(new long[] { 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+  }
+
+  @Test
+  public void testDeleteRowForDeletedRegion() throws IOException, ReplicationException {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+    addBarrier(region, 40, 50, 60);
+    fillCatalogFamily(region);
+
+    ReplicationQueueStorage queueStorage = create(59L);
+    @SuppressWarnings("unchecked")
+    ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList("1"));
+    ReplicationBarrierCleaner cleaner = create(peerManager);
+
+    // we have something in catalog family, so only delete 40
+    cleaner.chore();
+    assertArrayEquals(new long[] { 50, 60 },
+      MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+
+    // No catalog family, then we should remove the whole row
+    clearCatalogFamily(region);
+    cleaner.chore();
+    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      assertFalse(table
+        .exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)));
+    }
+  }
+
+  private static class WarnOnlyStoppable implements Stoppable {
+    @Override
+    public void stop(String why) {
+      LOG.warn("TestReplicationBarrierCleaner received stop, ignoring. Reason: " + why);
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7308ee0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
index 58e9543..29749bd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -157,7 +157,7 @@ public class TestSerialReplicationChecker {
     }
     for (int i = 0; i < barriers.length; i++) {
       put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
-        put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
+        put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
     }
     try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
       table.put(put);