You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/23 01:57:42 UTC
[hbase] 02/10: HBASE-24681 Remove the cache
walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019)
This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit b60ec36c292cb7732422432e9d6215ef9e01bfdb
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Mon Jul 13 17:35:32 2020 +0800
HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../regionserver/ReplicationSourceManager.java | 214 ++++++---------------
1 file changed, 63 insertions(+), 151 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 21979bb..00ee6a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -98,30 +98,6 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
* is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
* operations.</li>
- * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
- * {@link #addPeer(String)}, {@link #removePeer(String)},
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}.
- * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
- * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
- * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
- * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
- * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
- * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
- * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
- * {@link #preLogRoll(Path)}.</li>
- * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
- * modify it, {@link #removePeer(String)} ,
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
- * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
- * {@link ReplicationSourceInterface} firstly, then remove the wals from
- * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
- * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
- * {@link ReplicationSourceInterface}. So there is no race here. For
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
- * is already synchronized on {@link #oldsources}. So no need synchronized on
- * {@link #walsByIdRecoveredQueues}.</li>
* <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
* <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
* to-be-removed peer.</li>
@@ -134,15 +110,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final ConcurrentMap<String, ReplicationSourceInterface> sources;
// List of all the sources we got from died RSs
private final List<ReplicationSourceInterface> oldsources;
-
- /**
- * Storage for queues that need persistance; e.g. Replication state so can be recovered
- * after a crash. queueStorage upkeep is spread about this class and passed
- * to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource
- * instances keep state.
- */
private final ReplicationQueueStorage queueStorage;
-
private final ReplicationTracker replicationTracker;
private final ReplicationPeers replicationPeers;
// UUID for this cluster
@@ -150,15 +118,6 @@ public class ReplicationSourceManager implements ReplicationListener {
// All about stopping
private final Server server;
- // All logs we are currently tracking
- // Index structure of the map is: queue_id->logPrefix/logGroup->logs
- // For normal replication source, the peer id is same with the queue id
- private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
- // Logs for recovered sources we are currently tracking
- // the map is: queue_id->logPrefix/logGroup->logs
- // For recovered source, the queue id's format is peer_id-servername-*
- private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
-
private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
private final Configuration conf;
@@ -221,8 +180,6 @@ public class ReplicationSourceManager implements ReplicationListener {
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
- this.walsById = new ConcurrentHashMap<>();
- this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
this.oldsources = new ArrayList<>();
this.conf = conf;
this.fs = fs;
@@ -359,7 +316,6 @@ public class ReplicationSourceManager implements ReplicationListener {
// Delete queue from storage and memory and queue id is same with peer id for normal
// source
deleteQueue(peerId);
- this.walsById.remove(peerId);
}
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
if (peerConfig.isSyncReplication()) {
@@ -401,15 +357,10 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on latestPaths to avoid missing the new log
synchronized (this.latestPaths) {
this.sources.put(peerId, src);
- Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
- this.walsById.put(peerId, walsByGroup);
// Add the latest wal to that source's queue
if (!latestPaths.isEmpty()) {
for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
Path walPath = walPrefixAndPath.getValue();
- NavigableSet<String> wals = new TreeSet<>();
- wals.add(walPath.getName());
- walsByGroup.put(walPrefixAndPath.getKey(), wals);
// Abort RS and throw exception to make add peer failed
abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
@@ -463,7 +414,10 @@ public class ReplicationSourceManager implements ReplicationListener {
// map from walsById since later we may fail to delete them from the replication queue
// storage, and when we retry next time, we can not know the wal files that need to be deleted
// from the replication queue storage.
- walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
+ this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId).forEach(wal -> {
+ String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+ wals.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+ });
}
LOG.info("Startup replication source for " + src.getPeerId());
src.startup();
@@ -472,15 +426,6 @@ public class ReplicationSourceManager implements ReplicationListener {
queueStorage.removeWAL(server.getServerName(), peerId, wal);
}
}
- synchronized (walsById) {
- Map<String, NavigableSet<String>> oldWals = walsById.get(peerId);
- wals.forEach((k, v) -> {
- NavigableSet<String> walsByGroup = oldWals.get(k);
- if (walsByGroup != null) {
- walsByGroup.removeAll(v);
- }
- });
- }
// synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
// a background task, we will delete the file from replication queue storage under the lock to
// simplify the logic.
@@ -492,7 +437,6 @@ public class ReplicationSourceManager implements ReplicationListener {
oldSource.terminate(terminateMessage);
oldSource.getSourceMetrics().clear();
queueStorage.removeQueue(server.getServerName(), queueId);
- walsByIdRecoveredQueues.remove(queueId);
iter.remove();
}
}
@@ -505,7 +449,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer
*/
- public void refreshSources(String peerId) throws IOException {
+ public void refreshSources(String peerId) throws ReplicationException, IOException {
String terminateMessage = "Peer " + peerId +
" state or config changed. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -518,9 +462,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// Do not clear metrics
toRemove.terminate(terminateMessage, null, false);
}
- for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
- walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
- }
+ this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId)
+ .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
}
LOG.info("Startup replication source for " + src.getPeerId());
src.startup();
@@ -541,9 +484,8 @@ public class ReplicationSourceManager implements ReplicationListener {
for (String queueId : previousQueueIds) {
ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer);
this.oldsources.add(recoveredReplicationSource);
- for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
- walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
- }
+ this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)
+ .forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
toStartup.add(recoveredReplicationSource);
}
}
@@ -563,7 +505,6 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.info("Done with the recovered queue {}", src.getQueueId());
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
- this.walsByIdRecoveredQueues.remove(src.getQueueId());
return true;
}
@@ -586,8 +527,6 @@ public class ReplicationSourceManager implements ReplicationListener {
this.sources.remove(src.getPeerId());
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
- this.walsById.remove(src.getQueueId());
-
}
/**
@@ -672,42 +611,19 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param inclusive whether we should also remove the given log file
* @param source the replication source
*/
- void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
- String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
- if (source.isRecovered()) {
- NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
- if (wals != null) {
- NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
- if (walsToRemove.isEmpty()) {
- return;
- }
- cleanOldLogs(walsToRemove, source);
- walsToRemove.clear();
- }
- } else {
- NavigableSet<String> wals;
- NavigableSet<String> walsToRemove;
- // synchronized on walsById to avoid race with preLogRoll
- synchronized (this.walsById) {
- wals = walsById.get(source.getQueueId()).get(logPrefix);
- if (wals == null) {
- return;
- }
- walsToRemove = wals.headSet(log, inclusive);
- if (walsToRemove.isEmpty()) {
- return;
- }
- walsToRemove = new TreeSet<>(walsToRemove);
- }
- // cleanOldLogs may spend some time, especially for sync replication where we may want to
- // remove remote wals as the remote cluster may have already been down, so we do it outside
- // the lock to avoid block preLogRoll
- cleanOldLogs(walsToRemove, source);
- // now let's remove the files in the set
- synchronized (this.walsById) {
- wals.removeAll(walsToRemove);
- }
+ void cleanOldLogs(String log, boolean inclusive,
+ ReplicationSourceInterface source) {
+ NavigableSet<String> walsToRemove;
+ synchronized (this.latestPaths) {
+ walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive);
+ }
+ if (walsToRemove.isEmpty()) {
+ return;
}
+ // cleanOldLogs may spend some time, especially for sync replication where we may want to
+ // remove remote wals as the remote cluster may have already been down, so we do it outside
+ // the lock to avoid block preLogRoll
+ cleanOldLogs(walsToRemove, source);
}
private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
@@ -787,37 +703,6 @@ public class ReplicationSourceManager implements ReplicationListener {
abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
}
-
- // synchronized on walsById to avoid race with cleanOldLogs
- synchronized (this.walsById) {
- // Update walsById map
- for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
- .entrySet()) {
- String peerId = entry.getKey();
- Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
- boolean existingPrefix = false;
- for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
- SortedSet<String> wals = walsEntry.getValue();
- if (this.sources.isEmpty()) {
- // If there's no slaves, don't need to keep the old wals since
- // we only consider the last one when a new slave comes in
- wals.clear();
- }
- if (logPrefix.equals(walsEntry.getKey())) {
- wals.add(logName);
- existingPrefix = true;
- }
- }
- if (!existingPrefix) {
- // The new log belongs to a new group, add it into this peer
- LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
- NavigableSet<String> wals = new TreeSet<>();
- wals.add(logName);
- walsByPrefix.put(logPrefix, wals);
- }
- }
- }
-
// Add to latestPaths
latestPaths.put(logPrefix, newLog);
}
@@ -987,18 +872,6 @@ public class ReplicationSourceManager implements ReplicationListener {
continue;
}
}
- // track sources in walsByIdRecoveredQueues
- Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
- walsByIdRecoveredQueues.put(queueId, walsByGroup);
- for (String wal : walsSet) {
- String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
- NavigableSet<String> wals = walsByGroup.get(walPrefix);
- if (wals == null) {
- wals = new TreeSet<>();
- walsByGroup.put(walPrefix, wals);
- }
- wals.add(wal);
- }
oldsources.add(src);
LOG.info("Added source for recovered queue {}", src.getQueueId());
for (String wal : walsSet) {
@@ -1032,7 +905,18 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the normal sources on this rs
* @return a sorted set of wal names
*/
- public Map<String, Map<String, NavigableSet<String>>> getWALs() {
+ public Map<String, Map<String, NavigableSet<String>>> getWALs()
+ throws ReplicationException {
+ Map<String, Map<String, NavigableSet<String>>> walsById = new HashMap<>();
+ for (ReplicationSourceInterface source : sources.values()) {
+ String queueId = source.getQueueId();
+ Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+ walsById.put(queueId, walsByGroup);
+ for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
+ String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+ walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+ }
+ }
return Collections.unmodifiableMap(walsById);
}
@@ -1040,7 +924,18 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the recovered sources on this rs
* @return a sorted set of wal names
*/
- Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
+ Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues()
+ throws ReplicationException {
+ Map<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues = new HashMap<>();
+ for (ReplicationSourceInterface source : oldsources) {
+ String queueId = source.getQueueId();
+ Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+ walsByIdRecoveredQueues.put(queueId, walsByGroup);
+ for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
+ String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+ walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+ }
+ }
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
}
@@ -1286,4 +1181,21 @@ public class ReplicationSourceManager implements ReplicationListener {
}
return crs.startup();
}
+
+ private NavigableSet<String> getWalsToRemove(String queueId, String log, boolean inclusive) {
+ NavigableSet<String> walsToRemove = new TreeSet<>();
+ String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
+ try {
+ this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId).forEach(wal -> {
+ String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+ if (walPrefix.equals(logPrefix)) {
+ walsToRemove.add(wal);
+ }
+ });
+ } catch (ReplicationException e) {
+ // Just log the exception here, as the recovered replication source will try to cleanup again.
+ LOG.warn("Failed to read wals in queue {}", queueId, e);
+ }
+ return walsToRemove.headSet(log, inclusive);
+ }
}