You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/11/09 07:12:05 UTC

hbase git commit: HBASE-21451 The way we maintain the lastestPaths in ReplicationSourceManager is broken when sync replication is used

Repository: hbase
Updated Branches:
  refs/heads/master fa6373660 -> fe2265fa4


HBASE-21451 The way we maintain the lastestPaths in ReplicationSourceManager is broken when sync replication is used


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fe2265fa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fe2265fa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fe2265fa

Branch: refs/heads/master
Commit: fe2265fa4a1e828b2e68ff8e42639c5942dccb1b
Parents: fa63736
Author: Duo Zhang <zh...@apache.org>
Authored: Thu Nov 8 15:01:38 2018 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Fri Nov 9 14:53:33 2018 +0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSourceManager.java  | 40 ++++++++++----------
 .../TestReplicationSourceManager.java           | 16 ++++++++
 2 files changed, 35 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fe2265fa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5756cbc..20c1215 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -71,6 +70,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -148,7 +148,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Configuration conf;
   private final FileSystem fs;
   // The paths to the latest log of each wal group, for new coming peers
-  private final Set<Path> latestPaths;
+  private final Map<String, Path> latestPaths;
   // Path to the wals directories
   private final Path logDir;
   // Path to the wal archive
@@ -216,7 +216,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     tfb.setNameFormat("ReplicationExecutor-%d");
     tfb.setDaemon(true);
     this.executor.setThreadFactory(tfb.build());
-    this.latestPaths = new HashSet<Path>();
+    this.latestPaths = new HashMap<>();
     this.replicationForBulkLoadDataEnabled = conf.getBoolean(
       HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
     this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
@@ -371,17 +371,16 @@ public class ReplicationSourceManager implements ReplicationListener {
       Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
       this.walsById.put(peerId, walsByGroup);
       // Add the latest wal to that source's queue
-      if (this.latestPaths.size() > 0) {
-        for (Path logPath : latestPaths) {
-          String name = logPath.getName();
-          String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
-          NavigableSet<String> logs = new TreeSet<>();
-          logs.add(name);
-          walsByGroup.put(walPrefix, logs);
+      if (!latestPaths.isEmpty()) {
+        for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
+          Path walPath = walPrefixAndPath.getValue();
+          NavigableSet<String> wals = new TreeSet<>();
+          wals.add(walPath.getName());
+          walsByGroup.put(walPrefixAndPath.getKey(), wals);
           // Abort RS and throw exception to make add peer failed
           abortAndThrowIOExceptionWhenFail(
-            () -> this.queueStorage.addWAL(server.getServerName(), peerId, name));
-          src.enqueueLog(logPath);
+            () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
+          src.enqueueLog(walPath);
         }
       }
     }
@@ -780,15 +779,7 @@ public class ReplicationSourceManager implements ReplicationListener {
       }
 
       // Add to latestPaths
-      Iterator<Path> iterator = latestPaths.iterator();
-      while (iterator.hasNext()) {
-        Path path = iterator.next();
-        if (path.getName().contains(logPrefix)) {
-          iterator.remove();
-          break;
-        }
-      }
-      this.latestPaths.add(newLog);
+      latestPaths.put(logPrefix, newLog);
     }
   }
 
@@ -1054,6 +1045,13 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   @VisibleForTesting
+  Set<Path> getLastestPath() {
+    synchronized (latestPaths) {
+      return Sets.newHashSet(latestPaths.values());
+    }
+  }
+
+  @VisibleForTesting
   public AtomicLong getTotalBufferUsed() {
     return totalBufferUsed;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fe2265fa/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index febe764..0872ea7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -646,6 +646,22 @@ public abstract class TestReplicationSourceManager {
     }
   }
 
+  @Test
+  public void testSameWALPrefix() throws IOException {
+    Set<String> latestWalsBefore =
+      manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
+    String walName1 = "localhost,8080,12345-45678-Peer.34567";
+    String walName2 = "localhost,8080,12345.56789";
+    manager.preLogRoll(new Path(walName1));
+    manager.preLogRoll(new Path(walName2));
+
+    Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName)
+      .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet());
+    assertEquals(2, latestWals.size());
+    assertTrue(latestWals.contains(walName1));
+    assertTrue(latestWals.contains(walName2));
+  }
+
   /**
    * Add a peer and wait for it to initialize
    * @param waitForSource Whether to wait for replication source to initialize