You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/07/27 15:14:50 UTC
[hbase] branch branch-2 updated: HBASE-26120 New replication gets
stuck or data loss when multiwal groups more than 10 (#3528)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new dd22fec HBASE-26120 New replication gets stuck or data loss when multiwal groups more than 10 (#3528)
dd22fec is described below
commit dd22fecc2b0d1b83ddd26733eb79eda9a5fa8c31
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Jul 27 23:14:15 2021 +0800
HBASE-26120 New replication gets stuck or data loss when multiwal groups more than 10 (#3528)
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Michael Stack <st...@apache.org>
---
.../regionserver/ReplicationSourceManager.java | 40 ++++++++++------------
.../regionserver/TestReplicationSourceManager.java | 16 +++++++++
2 files changed, 35 insertions(+), 21 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index fe83168..fa44530 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -66,6 +65,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -154,7 +154,7 @@ public class ReplicationSourceManager {
private final Configuration conf;
private final FileSystem fs;
// The paths to the latest log of each wal group, for new coming peers
- private final Set<Path> latestPaths;
+ private final Map<String, Path> latestPaths;
// Path to the wals directories
private final Path logDir;
// Path to the wal archive
@@ -225,7 +225,7 @@ public class ReplicationSourceManager {
tfb.setNameFormat("ReplicationExecutor-%d");
tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build());
- this.latestPaths = new HashSet<Path>();
+ this.latestPaths = new HashMap<>();
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
@@ -345,17 +345,17 @@ public class ReplicationSourceManager {
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
this.walsById.put(peerId, walsByGroup);
// Add the latest wal to that source's queue
- if (this.latestPaths.size() > 0) {
- for (Path logPath : latestPaths) {
- String name = logPath.getName();
- String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
- NavigableSet<String> logs = new TreeSet<>();
- logs.add(name);
- walsByGroup.put(walPrefix, logs);
+ if (!latestPaths.isEmpty()) {
+ for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
+ Path walPath = walPrefixAndPath.getValue();
+ NavigableSet<String> wals = new TreeSet<>();
+ wals.add(walPath.getName());
+ walsByGroup.put(walPrefixAndPath.getKey(), wals);
// Abort RS and throw exception to make add peer failed
abortAndThrowIOExceptionWhenFail(
- () -> this.queueStorage.addWAL(server.getServerName(), peerId, name));
- src.enqueueLog(logPath);
+ () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
+ src.enqueueLog(walPath);
+ LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
}
}
}
@@ -606,15 +606,7 @@ public class ReplicationSourceManager {
}
// Add to latestPaths
- Iterator<Path> iterator = latestPaths.iterator();
- while (iterator.hasNext()) {
- Path path = iterator.next();
- if (path.getName().contains(logPrefix)) {
- iterator.remove();
- break;
- }
- }
- this.latestPaths.add(newLog);
+ latestPaths.put(logPrefix, newLog);
}
}
@@ -795,6 +787,12 @@ public class ReplicationSourceManager {
}
}
+ Set<Path> getLastestPath() {
+ synchronized (latestPaths) {
+ return Sets.newHashSet(latestPaths.values());
+ }
+ }
+
public AtomicLong getTotalBufferUsed() {
return totalBufferUsed;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 8aad2b1..422ab1c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -640,6 +640,22 @@ public abstract class TestReplicationSourceManager {
});
}
+ @Test
+ public void testSameWALPrefix() throws IOException {
+ Set<String> latestWalsBefore =
+ manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
+ String walName1 = "localhost,8080,12345-45678-Peer.34567";
+ String walName2 = "localhost,8080,12345.56789";
+ manager.preLogRoll(new Path(walName1));
+ manager.preLogRoll(new Path(walName2));
+
+ Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName)
+ .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet());
+ assertEquals(2, latestWals.size());
+ assertTrue(latestWals.contains(walName1));
+ assertTrue(latestWals.contains(walName2));
+ }
+
private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
// 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);