You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/09 07:19:34 UTC
[14/22] hbase git commit: HBASE-20117 Cleanup the unused replication
barriers in meta table
HBASE-20117 Cleanup the unused replication barriers in meta table
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/644bfe36
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/644bfe36
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/644bfe36
Branch: refs/heads/HBASE-20046-branch-2
Commit: 644bfe36b297b2787bf07a46eb6f5085322edfa9
Parents: fedf3ca
Author: zhangduo <zh...@apache.org>
Authored: Tue Mar 13 21:36:06 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/MetaTableAccessor.java | 2 +-
.../hbase/replication/ReplicationUtils.java | 56 +++-
.../org/apache/hadoop/hbase/master/HMaster.java | 91 +++---
.../cleaner/ReplicationBarrierCleaner.java | 162 ++++++++++
.../replication/ReplicationPeerManager.java | 10 +
.../NamespaceTableCfWALEntryFilter.java | 39 +--
.../cleaner/TestReplicationBarrierCleaner.java | 293 +++++++++++++++++++
.../TestSerialReplicationChecker.java | 2 +-
8 files changed, 565 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 2a88b56..a800c1c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -2053,7 +2053,7 @@ public class MetaTableAccessor {
return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength());
}
- private static long[] getReplicationBarriers(Result result) {
+ public static long[] getReplicationBarriers(Result result) {
return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
.stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 857b385..e2479e0 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -39,15 +39,6 @@ public final class ReplicationUtils {
private ReplicationUtils() {
}
- /**
- * @param c Configuration to look at
- * @return True if replication for bulk load data is enabled.
- */
- public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
- return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
- HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
- }
-
public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
Configuration baseConf) throws ReplicationException {
Configuration otherConf;
@@ -135,4 +126,51 @@ public final class ReplicationUtils {
isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
}
}
+
+ /**
+ * @param c Configuration to look at
+ * @return True if replication for bulk load data is enabled.
+ */
+ public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
+ return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+ }
+
+ /**
+ * Returns whether we should replicate the given table.
+ */
+ public static boolean contains(ReplicationPeerConfig peerConfig, TableName tableName) {
+ String namespace = tableName.getNamespaceAsString();
+ if (peerConfig.replicateAllUserTables()) {
+ // replicate all user tables, but filter by exclude namespaces and table-cfs config
+ Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
+ if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
+ return false;
+ }
+ Map<TableName, List<String>> excludedTableCFs = peerConfig.getTableCFsMap();
+ // trap here, must check existence first since HashMap allows null value.
+ if (excludedTableCFs == null || !excludedTableCFs.containsKey(tableName)) {
+ return true;
+ }
+ List<String> cfs = excludedTableCFs.get(tableName);
+ // if cfs is null or empty then we can make sure that we do not need to replicate this table,
+ // otherwise, we may still need to replicate the table but filter out some families.
+ return cfs != null && !cfs.isEmpty();
+ } else {
+ // Not replicate all user tables, so filter by namespaces and table-cfs config
+ Set<String> namespaces = peerConfig.getNamespaces();
+ Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
+
+ if (namespaces == null && tableCFs == null) {
+ return false;
+ }
+
+ // First filter by namespaces config
+ // If table's namespace in peer config, all the tables data are applicable for replication
+ if (namespaces != null && namespaces.contains(namespace)) {
+ return true;
+ }
+ return tableCFs != null && tableCFs.containsKey(tableName);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 7d751fb..6d0b58b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
@@ -109,6 +110,7 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
@@ -371,6 +373,7 @@ public class HMaster extends HRegionServer implements MasterServices {
CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
+ private ReplicationBarrierCleaner replicationBarrierCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
private MobCompactionChore mobCompactChore;
private MasterMobCompactionThread mobCompactThread;
@@ -1179,19 +1182,30 @@ public class HMaster extends HRegionServer implements MasterServices {
getMasterWalManager().getOldLogDir());
getChoreService().scheduleChore(logCleaner);
- //start the hfile archive cleaner thread
+ // start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
Map<String, Object> params = new HashMap<>();
params.put(MASTER, this);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
.getFileSystem(), archiveDir, params);
getChoreService().scheduleChore(hfileCleaner);
+
+ replicationBarrierCleaner =
+ new ReplicationBarrierCleaner(conf, this, getConnection(), replicationPeerManager);
+ getChoreService().scheduleChore(replicationBarrierCleaner);
+
serviceStarted = true;
if (LOG.isTraceEnabled()) {
LOG.trace("Started service threads");
}
}
+ private void cancelChore(ScheduledChore chore) {
+ if (chore != null) {
+ chore.cancel();
+ }
+ }
+
@Override
protected void stopServiceThreads() {
if (masterJettyServer != null) {
@@ -1205,24 +1219,33 @@ public class HMaster extends HRegionServer implements MasterServices {
super.stopServiceThreads();
stopChores();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping service threads");
- }
+ LOG.debug("Stopping service threads");
- // Clean up and close up shop
- if (this.logCleaner != null) this.logCleaner.cancel(true);
- if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
- if (this.quotaManager != null) this.quotaManager.stop();
+ if (this.quotaManager != null) {
+ this.quotaManager.stop();
+ }
- if (this.activeMasterManager != null) this.activeMasterManager.stop();
- if (this.serverManager != null) this.serverManager.stop();
- if (this.assignmentManager != null) this.assignmentManager.stop();
+ if (this.activeMasterManager != null) {
+ this.activeMasterManager.stop();
+ }
+ if (this.serverManager != null) {
+ this.serverManager.stop();
+ }
+ if (this.assignmentManager != null) {
+ this.assignmentManager.stop();
+ }
stopProcedureExecutor();
- if (this.walManager != null) this.walManager.stop();
- if (this.fileSystemManager != null) this.fileSystemManager.stop();
- if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
+ if (this.walManager != null) {
+ this.walManager.stop();
+ }
+ if (this.fileSystemManager != null) {
+ this.fileSystemManager.stop();
+ }
+ if (this.mpmHost != null) {
+ this.mpmHost.stop("server shutting down.");
+ }
}
private void startProcedureExecutor() throws IOException {
@@ -1261,37 +1284,21 @@ public class HMaster extends HRegionServer implements MasterServices {
}
private void stopChores() {
- if (this.expiredMobFileCleanerChore != null) {
- this.expiredMobFileCleanerChore.cancel(true);
- }
- if (this.mobCompactChore != null) {
- this.mobCompactChore.cancel(true);
- }
- if (this.balancerChore != null) {
- this.balancerChore.cancel(true);
- }
- if (this.normalizerChore != null) {
- this.normalizerChore.cancel(true);
- }
- if (this.clusterStatusChore != null) {
- this.clusterStatusChore.cancel(true);
- }
- if (this.catalogJanitorChore != null) {
- this.catalogJanitorChore.cancel(true);
- }
- if (this.clusterStatusPublisherChore != null){
- clusterStatusPublisherChore.cancel(true);
- }
+ cancelChore(this.expiredMobFileCleanerChore);
+ cancelChore(this.mobCompactChore);
+ cancelChore(this.balancerChore);
+ cancelChore(this.normalizerChore);
+ cancelChore(this.clusterStatusChore);
+ cancelChore(this.catalogJanitorChore);
+ cancelChore(this.clusterStatusPublisherChore);
if (this.mobCompactThread != null) {
this.mobCompactThread.close();
}
-
- if (this.quotaObserverChore != null) {
- quotaObserverChore.cancel();
- }
- if (this.snapshotQuotaChore != null) {
- snapshotQuotaChore.cancel();
- }
+ cancelChore(this.clusterStatusPublisherChore);
+ cancelChore(this.snapshotQuotaChore);
+ cancelChore(this.logCleaner);
+ cancelChore(this.hfileCleaner);
+ cancelChore(this.replicationBarrierCleaner);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java
new file mode 100644
index 0000000..16b8fc5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationBarrierCleaner.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to clean the useless barriers in {@link HConstants#REPLICATION_BARRIER_FAMILY_STR} family in
+ * meta table.
+ */
+@InterfaceAudience.Private
+public class ReplicationBarrierCleaner extends ScheduledChore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationBarrierCleaner.class);
+
+ private static final String REPLICATION_BARRIER_CLEANER_INTERVAL =
+ "hbase.master.cleaner.replication.barrier.interval";
+
+ // 12 hour. Usually regions will not be moved so the barrier are rarely updated. Use a large
+ // interval.
+ private static final int DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL = 12 * 60 * 60 * 1000;
+
+ private final Connection conn;
+
+ private final ReplicationPeerManager peerManager;
+
+ public ReplicationBarrierCleaner(Configuration conf, Stoppable stopper, Connection conn,
+ ReplicationPeerManager peerManager) {
+ super("ReplicationBarrierCleaner", stopper, conf.getInt(REPLICATION_BARRIER_CLEANER_INTERVAL,
+ DEFAULT_REPLICATION_BARRIER_CLEANER_INTERVAL));
+ this.conn = conn;
+ this.peerManager = peerManager;
+ }
+
+ @Override
+ protected void chore() {
+ long totalRows = 0;
+ long cleanedRows = 0;
+ long deletedRows = 0;
+ long deletedBarriers = 0;
+ TableName tableName = null;
+ List<String> peerIds = null;
+ try (Table metaTable = conn.getTable(TableName.META_TABLE_NAME);
+ ResultScanner scanner = metaTable.getScanner(
+ new Scan().addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions())) {
+ for (;;) {
+ Result result = scanner.next();
+ if (result == null) {
+ break;
+ }
+ totalRows++;
+ long[] barriers = MetaTableAccessor.getReplicationBarriers(result);
+ if (barriers.length == 0) {
+ continue;
+ }
+ byte[] regionName = result.getRow();
+ TableName tn = RegionInfo.getTable(regionName);
+ if (!tn.equals(tableName)) {
+ tableName = tn;
+ peerIds = peerManager.getSerialPeerIdsBelongsTo(tableName);
+ }
+ if (peerIds.isEmpty()) {
+ // no serial replication, only keep the newest barrier
+ Cell cell = result.getColumnLatestCell(HConstants.REPLICATION_BARRIER_FAMILY,
+ HConstants.SEQNUM_QUALIFIER);
+ metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
+ cell.getTimestamp() - 1));
+ cleanedRows++;
+ deletedBarriers += barriers.length - 1;
+ continue;
+ }
+ String encodedRegionName = RegionInfo.encodeRegionName(regionName);
+ long pushedSeqId = Long.MAX_VALUE;
+ for (String peerId : peerIds) {
+ pushedSeqId = Math.min(pushedSeqId,
+ peerManager.getQueueStorage().getLastSequenceId(encodedRegionName, peerId));
+ }
+ int index = Arrays.binarySearch(barriers, pushedSeqId);
+ if (index == -1) {
+ // beyond the first barrier, usually this should not happen but anyway let's add a check
+ // for it.
+ continue;
+ }
+ if (index < 0) {
+ index = -index - 1;
+ } else {
+ index++;
+ }
+ // A special case for merged/split region, where we are in the last closed range and the
+ // pushedSeqId is the last barrier minus 1.
+ if (index == barriers.length - 1 && pushedSeqId == barriers[barriers.length - 1] - 1) {
+ // check if the region has already been removed, i.e, no catalog family
+ if (!metaTable.exists(new Get(regionName).addFamily(HConstants.CATALOG_FAMILY))) {
+ metaTable
+ .delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
+ deletedRows++;
+ deletedBarriers += barriers.length;
+ continue;
+ }
+ }
+ // the barrier before 'index - 1'(exclusive) can be safely removed. See the algorithm in
+ // SerialReplicationChecker for more details.
+ if (index - 1 > 0) {
+ List<Cell> cells = result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY,
+ HConstants.SEQNUM_QUALIFIER);
+ // All barriers before this cell(exclusive) can be removed
+ Cell cell = cells.get(cells.size() - index);
+ metaTable.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY,
+ cell.getTimestamp() - 1));
+ cleanedRows++;
+ deletedBarriers += index - 1;
+ }
+ }
+ } catch (ReplicationException | IOException e) {
+ LOG.warn("Failed to clean up replication barrier", e);
+ }
+ if (totalRows > 0) {
+ LOG.info(
+ "Cleanup replication barriers: " +
+ "totalRows {}, cleanedRows {}, deletedRows {}, deletedBarriers {}",
+ totalRows, cleanedRows, deletedRows, deletedBarriers);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 19cd89d..1e93373 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -327,6 +327,16 @@ public class ReplicationPeerManager {
}
}
+ public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
+ return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
+ .filter(p -> ReplicationUtils.contains(p.getPeerConfig(), tableName)).map(p -> p.getPeerId())
+ .collect(Collectors.toList());
+ }
+
+ public ReplicationQueueStorage getQueueStorage() {
+ return queueStorage;
+ }
+
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
throws ReplicationException {
ReplicationPeerStorage peerStorage =
http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index 08c9f37..3a3200a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
@@ -53,44 +52,10 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
@Override
public Entry filter(Entry entry) {
- TableName tabName = entry.getKey().getTableName();
- String namespace = tabName.getNamespaceAsString();
- ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
-
- if (peerConfig.replicateAllUserTables()) {
- // replicate all user tables, but filter by exclude namespaces config
- Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
-
- // return null(prevent replicating) if logKey's table is in this peer's
- // exclude namespaces list
- if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
- return null;
- }
-
+ if (ReplicationUtils.contains(this.peer.getPeerConfig(), entry.getKey().getTableName())) {
return entry;
} else {
- // Not replicate all user tables, so filter by namespaces and table-cfs config
- Set<String> namespaces = peerConfig.getNamespaces();
- Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
-
- if (namespaces == null && tableCFs == null) {
- return null;
- }
-
- // First filter by namespaces config
- // If table's namespace in peer config, all the tables data are applicable for replication
- if (namespaces != null && namespaces.contains(namespace)) {
- return entry;
- }
-
- // Then filter by table-cfs config
- // return null(prevent replicating) if logKey's table isn't in this peer's
- // replicable tables list
- if (tableCFs == null || !tableCFs.containsKey(tabName)) {
- return null;
- }
-
- return entry;
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java
new file mode 100644
index 0000000..671bc22
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationBarrierCleaner.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReplicationBarrierCleaner {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationBarrierCleaner.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestHFileCleaner.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ @Rule
+ public final TestName name = new TestName();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
+ ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY)
+ .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) {
+ for (;;) {
+ Result result = scanner.next();
+ if (result == null) {
+ break;
+ }
+ TableName tableName = RegionInfo.getTable(result.getRow());
+ if (!tableName.isSystemTable()) {
+ table.delete(new Delete(result.getRow()));
+ }
+ }
+ }
+ }
+
+ private ReplicationPeerManager create(ReplicationQueueStorage queueStorage,
+ List<String> firstPeerIds, @SuppressWarnings("unchecked") List<String>... peerIds) {
+ ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
+ if (queueStorage != null) {
+ when(peerManager.getQueueStorage()).thenReturn(queueStorage);
+ }
+ if (peerIds.length == 0) {
+ when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds);
+ } else {
+ when(peerManager.getSerialPeerIdsBelongsTo(any(TableName.class))).thenReturn(firstPeerIds,
+ peerIds);
+ }
+ return peerManager;
+ }
+
+ private ReplicationQueueStorage create(Long lastPushedSeqId, Long... lastPushedSeqIds)
+ throws ReplicationException {
+ ReplicationQueueStorage queueStorage = mock(ReplicationQueueStorage.class);
+ if (lastPushedSeqIds.length == 0) {
+ when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId);
+ } else {
+ when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId,
+ lastPushedSeqIds);
+ }
+ return queueStorage;
+ }
+
+ private ReplicationBarrierCleaner create(ReplicationPeerManager peerManager) throws IOException {
+ return new ReplicationBarrierCleaner(UTIL.getConfiguration(), new WarnOnlyStoppable(),
+ UTIL.getConnection(), peerManager);
+ }
+
+ private void addBarrier(RegionInfo region, long... barriers) throws IOException {
+ Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
+ for (int i = 0; i < barriers.length; i++) {
+ put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
+ put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
+ }
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ table.put(put);
+ }
+ }
+
+ private void fillCatalogFamily(RegionInfo region) throws IOException {
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ table.put(new Put(region.getRegionName()).addColumn(HConstants.CATALOG_FAMILY,
+ Bytes.toBytes("whatever"), Bytes.toBytes("whatever")));
+ }
+ }
+
+ private void clearCatalogFamily(RegionInfo region) throws IOException {
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ table.delete(new Delete(region.getRegionName()).addFamily(HConstants.CATALOG_FAMILY));
+ }
+ }
+
+ @Test
+ public void testNothing() throws IOException {
+ ReplicationPeerManager peerManager = mock(ReplicationPeerManager.class);
+ ReplicationBarrierCleaner cleaner = create(peerManager);
+ cleaner.chore();
+ verify(peerManager, never()).getSerialPeerIdsBelongsTo(any(TableName.class));
+ verify(peerManager, never()).getQueueStorage();
+ }
+
+ @Test
+ public void testCleanNoPeers() throws IOException {
+ TableName tableName1 = TableName.valueOf(name.getMethodName() + "_1");
+ RegionInfo region11 =
+ RegionInfoBuilder.newBuilder(tableName1).setEndKey(Bytes.toBytes(1)).build();
+ addBarrier(region11, 10, 20, 30, 40, 50, 60);
+ RegionInfo region12 =
+ RegionInfoBuilder.newBuilder(tableName1).setStartKey(Bytes.toBytes(1)).build();
+ addBarrier(region12, 20, 30, 40, 50, 60, 70);
+
+ TableName tableName2 = TableName.valueOf(name.getMethodName() + "_2");
+ RegionInfo region21 =
+ RegionInfoBuilder.newBuilder(tableName2).setEndKey(Bytes.toBytes(1)).build();
+ addBarrier(region21, 100, 200, 300, 400);
+ RegionInfo region22 =
+ RegionInfoBuilder.newBuilder(tableName2).setStartKey(Bytes.toBytes(1)).build();
+ addBarrier(region22, 200, 300, 400, 500, 600);
+
+ @SuppressWarnings("unchecked")
+ ReplicationPeerManager peerManager =
+ create(null, Collections.emptyList(), Collections.emptyList());
+ ReplicationBarrierCleaner cleaner = create(peerManager);
+ cleaner.chore();
+
+ // should never call this method
+ verify(peerManager, never()).getQueueStorage();
+ // should only be called twice although we have 4 regions to clean
+ verify(peerManager, times(2)).getSerialPeerIdsBelongsTo(any(TableName.class));
+
+ assertArrayEquals(new long[] { 60 },
+ MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region11.getRegionName()));
+ assertArrayEquals(new long[] { 70 },
+ MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region12.getRegionName()));
+
+ assertArrayEquals(new long[] { 400 },
+ MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region21.getRegionName()));
+ assertArrayEquals(new long[] { 600 },
+ MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region22.getRegionName()));
+ }
+
+ @Test
+ public void testDeleteBarriers() throws IOException, ReplicationException {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+ addBarrier(region, 10, 20, 30, 40, 50, 60);
+ // two peers
+ ReplicationQueueStorage queueStorage = create(-1L, 2L, 15L, 25L, 20L, 25L, 65L, 55L, 70L, 70L);
+ List<String> peerIds = Lists.newArrayList("1", "2");
+
+ @SuppressWarnings("unchecked")
+ ReplicationPeerManager peerManager =
+ create(queueStorage, peerIds, peerIds, peerIds, peerIds, peerIds);
+ ReplicationBarrierCleaner cleaner = create(peerManager);
+
+ // beyond the first barrier, no deletion
+ cleaner.chore();
+ assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 },
+ MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+
+ // in the first range, still no deletion
+ cleaner.chore();
+ assertArrayEquals(new long[] { 10, 20, 30, 40, 50, 60 },
+ MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+
+ // in the second range, 10 is deleted
+ cleaner.chore();
+ assertArrayEquals(new long[] { 20, 30, 40, 50, 60 },
+ MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+
+ // between 50 and 60, so the barriers before 50 will be deleted
+ cleaner.chore();
+ assertArrayEquals(new long[] { 50, 60 },
+ MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+
+ // in the last open range, 50 is deleted
+ cleaner.chore();
+ assertArrayEquals(new long[] { 60 },
+ MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+ }
+
+ @Test
+ public void testDeleteRowForDeletedRegion() throws IOException, ReplicationException {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+ addBarrier(region, 40, 50, 60);
+ fillCatalogFamily(region);
+
+ ReplicationQueueStorage queueStorage = create(59L);
+ @SuppressWarnings("unchecked")
+ ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList("1"));
+ ReplicationBarrierCleaner cleaner = create(peerManager);
+
+ // we have something in catalog family, so only delete 40
+ cleaner.chore();
+ assertArrayEquals(new long[] { 50, 60 },
+ MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
+
+ // No catalog family, then we should remove the whole row
+ clearCatalogFamily(region);
+ cleaner.chore();
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ assertFalse(table
+ .exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)));
+ }
+ }
+
+ private static class WarnOnlyStoppable implements Stoppable {
+ @Override
+ public void stop(String why) {
+ LOG.warn("TestReplicationBarrierCleaner received stop, ignoring. Reason: " + why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/644bfe36/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
index 58e9543..29749bd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -157,7 +157,7 @@ public class TestSerialReplicationChecker {
}
for (int i = 0; i < barriers.length; i++) {
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
- put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
+ put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
}
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.put(put);