You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/11/27 15:50:46 UTC

[hbase] branch branch-2 updated: HBASE-27463 Reset sizeOfLogQueue when refresh replication source (#4863)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 983eb0c0be0 HBASE-27463 Reset sizeOfLogQueue when refresh replication source (#4863)
983eb0c0be0 is described below

commit 983eb0c0be09cc65def9f452545befbb07345adf
Author: Ruanhui <32...@users.noreply.github.com>
AuthorDate: Sun Nov 27 23:07:40 2022 +0800

    HBASE-27463 Reset sizeOfLogQueue when refresh replication source (#4863)
    
    Co-authored-by: huiruan <hu...@tencent.com>
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Reviewed-by: Rushabh Shah <sh...@gmail.com>
    (cherry picked from commit bb9f43c6f9079b196e7a9d5a5eb35e721b092052)
---
 .../regionserver/ReplicationSourceManager.java     | 28 ++++++++--------------
 1 file changed, 10 insertions(+), 18 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index c6d19a8d233..f32e62b749a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -97,15 +97,11 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
  * {@link #preLogRoll(Path)}.</li>
  * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
- * modify it, {@link #removePeer(String)} , <<<<<<< HEAD
+ * modify it, {@link #removePeer(String)},
  * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
- * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by =======
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
  * {@link ReplicationSourceManager#claimQueue(ServerName, String)}.
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by >>>>>>>
- * 51893b9ba3... HBASE-26029 It is not reliable to use nodeDeleted event to track region server's
- * death (#3430) {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
+ * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
+ * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
  * {@link ReplicationSourceInterface} firstly, then remove the wals from
  * {@link #walsByIdRecoveredQueues}. And
  * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add the wals to
@@ -368,22 +364,18 @@ public class ReplicationSourceManager {
     String terminateMessage = "Peer " + peerId
       + " state or config changed. Will close the previous replication source and open a new one";
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
-    ReplicationSourceInterface src = createSource(peerId, peer);
+    ReplicationSourceInterface src;
     // synchronized on latestPaths to avoid missing the new log
     synchronized (this.latestPaths) {
-      ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
+      ReplicationSourceInterface toRemove = this.sources.remove(peerId);
       if (toRemove != null) {
         LOG.info("Terminate replication source for " + toRemove.getPeerId());
-        // Do not clear metrics
-        toRemove.terminate(terminateMessage, null, false);
+        toRemove.terminate(terminateMessage, null, true);
       }
-      for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
-        walsByGroup.forEach(wal -> {
-          Path walPath = new Path(this.logDir, wal);
-          src.enqueueLog(walPath);
-          LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
-        });
-
+      src = createSource(peerId, peer);
+      this.sources.put(peerId, src);
+      for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
+        walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
       }
     }
     LOG.info("Startup replication source for " + src.getPeerId());