You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/07/27 16:22:39 UTC

[hbase] branch branch-1 updated: HBASE-26120 New replication gets stuck or data loss when multiwal groups more than 10 (#3528)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 8f82cbf  HBASE-26120 New replication gets stuck or data loss when multiwal groups more than 10 (#3528)
8f82cbf is described below

commit 8f82cbfbcbb03bf0787cc2c018d664f7f7263130
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Jul 27 23:14:15 2021 +0800

    HBASE-26120 New replication gets stuck or data loss when multiwal groups more than 10 (#3528)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Michael Stack <st...@apache.org>
    
    Conflicts:
    	hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
---
 .../regionserver/ReplicationSourceManager.java     | 42 ++++++++++------------
 .../TestReplicationSourceManagerManager.java       | 15 ++++++++
 2 files changed, 34 insertions(+), 23 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 2d6c29a..8d669a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -19,14 +19,13 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -108,7 +107,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Configuration conf;
   private final FileSystem fs;
   // The paths to the latest log of each wal group, for new coming peers
-  private Set<Path> latestPaths;
+  private final Map<String, Path> latestPaths;
   // Path to the wals directories
   private final Path logDir;
   // Path to the wal archive
@@ -171,7 +170,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     tfb.setDaemon(true);
     this.executor.setThreadFactory(tfb.build());
     this.rand = new Random();
-    this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
+    this.latestPaths = new HashMap<>();
     replicationForBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
@@ -305,23 +304,22 @@ public class ReplicationSourceManager implements ReplicationListener {
       this.walsById.put(id, walsByGroup);
       // Add the latest wal to that source's queue
       synchronized (latestPaths) {
-        if (this.latestPaths.size() > 0) {
-          for (Path logPath : latestPaths) {
-            String name = logPath.getName();
-            String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
-            SortedSet<String> logs = new TreeSet<String>();
-            logs.add(name);
-            walsByGroup.put(walPrefix, logs);
+        if (!latestPaths.isEmpty()) {
+          for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
+            Path walPath = walPrefixAndPath.getValue();
+            SortedSet<String> wals = new TreeSet<>();
+            wals.add(walPath.getName());
+            walsByGroup.put(walPrefixAndPath.getKey(), wals);
             try {
-              this.replicationQueues.addLog(id, name);
+              this.replicationQueues.addLog(id, walPath.getName());
             } catch (ReplicationException e) {
               String message =
                   "Cannot add log to queue when creating a new source, queueId=" + id
-                      + ", filename=" + name;
+                      + ", filename=" + walPath.getName();
               server.stop(message);
               throw e;
             }
-            src.enqueueLog(logPath);
+            src.enqueueLog(walPath);
           }
         }
       }
@@ -409,15 +407,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     String logName = newLog.getName();
     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
     synchronized (latestPaths) {
-      Iterator<Path> iterator = latestPaths.iterator();
-      while (iterator.hasNext()) {
-        Path path = iterator.next();
-        if (path.getName().contains(logPrefix)) {
-          iterator.remove();
-          break;
-        }
-      }
-      this.latestPaths.add(newLog);
+      latestPaths.put(logPrefix, newLog);
     }
   }
 
@@ -693,6 +683,12 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
+  Set<Path> getLastestPath() {
+    synchronized (latestPaths) {
+      return Sets.newHashSet(latestPaths.values());
+    }
+  }
+
   /**
    * Class responsible to setup new ReplicationSources to take care of the
    * queues from dead region servers.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java
index 50c96cf..cdff735 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -509,6 +510,20 @@ public class TestReplicationSourceManagerManager extends TestReplicationSourceMa
     });
   }
 
+  @Test
+  public void testSameWALPrefix() throws IOException {
+    Set<Path> latestWalsBefore = manager.getLastestPath();
+    Path walName1 = new Path("localhost,8080,12345-45678-Peer.34567");
+    Path walName2 = new Path("localhost,8080,12345.56789");
+    manager.preLogRoll(walName1);
+    manager.preLogRoll(walName2);
+    Set<Path> latestWals = manager.getLastestPath();
+    latestWals.removeAll(latestWalsBefore);
+    assertEquals(2, latestWals.size());
+    assertTrue(latestWals.contains(walName1));
+    assertTrue(latestWals.contains(walName2));
+  }
+
   private WALEdit getBulkLoadWALEdit() {
     // 1. Create store files for the families
     Map<byte[], List<Path>> storeFiles = new HashMap<>(1);