You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/11/13 14:25:59 UTC
[hbase] 04/08: HBASE-27215 Add support for sync replication (#4762)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 05ae3edc10f5f48028393c276e59faa2d3b28383
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Sep 15 22:58:29 2022 +0800
HBASE-27215 Add support for sync replication (#4762)
Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
.../regionserver/ReplicationSource.java | 2 +-
.../regionserver/ReplicationSourceManager.java | 53 +++++++++++-----------
.../TestDrainReplicationQueuesForStandBy.java | 3 --
3 files changed, 28 insertions(+), 30 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 45b66bcb1dc..788fb4871c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -465,7 +465,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
t.getName());
manager.refreshSources(peerId);
break;
- } catch (IOException e1) {
+ } catch (IOException | ReplicationException e1) {
LOG.error("Replication sources refresh failed.", e1);
sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index e3745a7c2e3..bbb712690b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -403,38 +403,44 @@ public class ReplicationSourceManager {
// TODO: use empty initial offsets for now, revisit when adding support for sync replication
ReplicationSourceInterface src =
createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer);
- // synchronized here to avoid race with preLogRoll where we add new log to source and also
+ // synchronized here to avoid race with postLogRoll where we add new log to source and also
// walsById.
ReplicationSourceInterface toRemove;
- Map<String, NavigableSet<String>> wals = new HashMap<>();
+ ReplicationQueueData queueData;
synchronized (latestPaths) {
+ // Here we make a copy of all the remaining wal files and then delete them from the
+ // replication queue storage after releasing the lock. It is not safe to just remove the old
+ // map from walsById since later we may fail to update the replication queue storage, and when
+ // we retry next time, we can not know the wal files that needs to be set to the replication
+ // queue storage
+ ImmutableMap.Builder<String, ReplicationGroupOffset> builder = ImmutableMap.builder();
+ synchronized (walsById) {
+ walsById.get(queueId).forEach((group, wals) -> {
+ if (!wals.isEmpty()) {
+ builder.put(group, new ReplicationGroupOffset(wals.last(), -1));
+ }
+ });
+ }
+ queueData = new ReplicationQueueData(queueId, builder.build());
+ src = createSource(queueData, peer);
toRemove = sources.put(peerId, src);
if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage);
toRemove.getSourceMetrics().clear();
}
- // Here we make a copy of all the remaining wal files and then delete them from the
- // replication queue storage after releasing the lock. It is not safe to just remove the old
- // map from walsById since later we may fail to delete them from the replication queue
- // storage, and when we retry next time, we can not know the wal files that need to be deleted
- // from the replication queue storage.
- walsById.get(queueId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
+ }
+ for (Map.Entry<String, ReplicationGroupOffset> entry : queueData.getOffsets().entrySet()) {
+ queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap());
}
LOG.info("Startup replication source for " + src.getPeerId());
src.startup();
- for (NavigableSet<String> walsByGroup : wals.values()) {
- // TODO: just need to reset the replication offset
- // for (String wal : walsByGroup) {
- // queueStorage.removeWAL(server.getServerName(), peerId, wal);
- // }
- }
synchronized (walsById) {
- Map<String, NavigableSet<String>> oldWals = walsById.get(queueId);
- wals.forEach((k, v) -> {
- NavigableSet<String> walsByGroup = oldWals.get(k);
+ Map<String, NavigableSet<String>> wals = walsById.get(queueId);
+ queueData.getOffsets().forEach((group, offset) -> {
+ NavigableSet<String> walsByGroup = wals.get(group);
if (walsByGroup != null) {
- walsByGroup.removeAll(v);
+ walsByGroup.headSet(offset.getWal(), true).clear();
}
});
}
@@ -457,13 +463,8 @@ public class ReplicationSourceManager {
}
private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId,
- ReplicationPeer peer) throws IOException {
- Map<String, ReplicationGroupOffset> offsets;
- try {
- offsets = queueStorage.getOffsets(queueId);
- } catch (ReplicationException e) {
- throw new IOException(e);
- }
+ ReplicationPeer peer) throws IOException, ReplicationException {
+ Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer);
}
@@ -473,7 +474,7 @@ public class ReplicationSourceManager {
* replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer
*/
- public void refreshSources(String peerId) throws IOException {
+ public void refreshSources(String peerId) throws ReplicationException, IOException {
String terminateMessage = "Peer " + peerId
+ " state or config changed. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
index 8918f8422e1..0189d475575 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
@@ -35,12 +35,9 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-// TODO: revisit later
-@Ignore
@Category({ ReplicationTests.class, MediumTests.class })
public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {