You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/18 13:57:27 UTC

[hbase] 04/11: HBASE-27215 Add support for sync replication (#4762)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d24e7cf39f98beeaa94c997953171a71bbab2447
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Sep 15 22:58:29 2022 +0800

    HBASE-27215 Add support for sync replication (#4762)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../regionserver/ReplicationSource.java            |  2 +-
 .../regionserver/ReplicationSourceManager.java     | 53 +++++++++++-----------
 .../TestDrainReplicationQueuesForStandBy.java      |  3 --
 3 files changed, 28 insertions(+), 30 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index e078722b157..0784a87711b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -465,7 +465,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
             t.getName());
           manager.refreshSources(peerId);
           break;
-        } catch (IOException e1) {
+        } catch (IOException | ReplicationException e1) {
           LOG.error("Replication sources refresh failed.", e1);
           sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 03569be86fc..f3d07315240 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -403,38 +403,44 @@ public class ReplicationSourceManager {
     // TODO: use empty initial offsets for now, revisit when adding support for sync replication
     ReplicationSourceInterface src =
       createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer);
-    // synchronized here to avoid race with preLogRoll where we add new log to source and also
+    // synchronized here to avoid race with postLogRoll where we add new log to source and also
     // walsById.
     ReplicationSourceInterface toRemove;
-    Map<String, NavigableSet<String>> wals = new HashMap<>();
+    ReplicationQueueData queueData;
     synchronized (latestPaths) {
+      // Here we make a copy of all the remaining wal files and then delete them from the
+      // replication queue storage after releasing the lock. It is not safe to just remove the old
+      // map from walsById since later we may fail to update the replication queue storage, and when
+      // we retry next time, we can not know the wal files that needs to be set to the replication
+      // queue storage
+      ImmutableMap.Builder<String, ReplicationGroupOffset> builder = ImmutableMap.builder();
+      synchronized (walsById) {
+        walsById.get(queueId).forEach((group, wals) -> {
+          if (!wals.isEmpty()) {
+            builder.put(group, new ReplicationGroupOffset(wals.last(), -1));
+          }
+        });
+      }
+      queueData = new ReplicationQueueData(queueId, builder.build());
+      src = createSource(queueData, peer);
       toRemove = sources.put(peerId, src);
       if (toRemove != null) {
         LOG.info("Terminate replication source for " + toRemove.getPeerId());
         toRemove.terminate(terminateMessage);
         toRemove.getSourceMetrics().clear();
       }
-      // Here we make a copy of all the remaining wal files and then delete them from the
-      // replication queue storage after releasing the lock. It is not safe to just remove the old
-      // map from walsById since later we may fail to delete them from the replication queue
-      // storage, and when we retry next time, we can not know the wal files that need to be deleted
-      // from the replication queue storage.
-      walsById.get(queueId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
+    }
+    for (Map.Entry<String, ReplicationGroupOffset> entry : queueData.getOffsets().entrySet()) {
+      queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap());
     }
     LOG.info("Startup replication source for " + src.getPeerId());
     src.startup();
-    for (NavigableSet<String> walsByGroup : wals.values()) {
-      // TODO: just need to reset the replication offset
-      // for (String wal : walsByGroup) {
-      // queueStorage.removeWAL(server.getServerName(), peerId, wal);
-      // }
-    }
     synchronized (walsById) {
-      Map<String, NavigableSet<String>> oldWals = walsById.get(queueId);
-      wals.forEach((k, v) -> {
-        NavigableSet<String> walsByGroup = oldWals.get(k);
+      Map<String, NavigableSet<String>> wals = walsById.get(queueId);
+      queueData.getOffsets().forEach((group, offset) -> {
+        NavigableSet<String> walsByGroup = wals.get(group);
         if (walsByGroup != null) {
-          walsByGroup.removeAll(v);
+          walsByGroup.headSet(offset.getWal(), true).clear();
         }
       });
     }
@@ -457,13 +463,8 @@ public class ReplicationSourceManager {
   }
 
   private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId,
-    ReplicationPeer peer) throws IOException {
-    Map<String, ReplicationGroupOffset> offsets;
-    try {
-      offsets = queueStorage.getOffsets(queueId);
-    } catch (ReplicationException e) {
-      throw new IOException(e);
-    }
+    ReplicationPeer peer) throws IOException, ReplicationException {
+    Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
     return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer);
   }
 
@@ -473,7 +474,7 @@ public class ReplicationSourceManager {
    * replication queue storage and only to enqueue all logs to the new replication source
    * @param peerId the id of the replication peer
    */
-  public void refreshSources(String peerId) throws IOException {
+  public void refreshSources(String peerId) throws ReplicationException, IOException {
     String terminateMessage = "Peer " + peerId
       + " state or config changed. Will close the previous replication source and open a new one";
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
index 8918f8422e1..0189d475575 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
@@ -35,12 +35,9 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-// TODO: revisit later
-@Ignore
 @Category({ ReplicationTests.class, MediumTests.class })
 public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {