You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/11/09 13:36:41 UTC
[iotdb] branch master updated: optimize transfer leader implementation (#7923)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 887e49fb2b optimize transfer leader implementation (#7923)
887e49fb2b is described below
commit 887e49fb2b6d403b0051f4c6ced917ef98560354
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Wed Nov 9 21:36:35 2022 +0800
optimize transfer leader implementation (#7923)
---
.../iotdb/consensus/ratis/RatisConsensus.java | 23 ++++++++++++++--------
.../iotdb/consensus/ratis/RatisConsensusTest.java | 18 +++++++++++++++++
2 files changed, 33 insertions(+), 8 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 1e418f9643..ccefcb684e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -570,7 +570,7 @@ class RatisConsensus implements IConsensus {
return failed(new RatisRequestFailedException(configChangeReply.getException()));
}
- reply = forceStepDownLeader(raftGroup);
+ reply = transferLeader(raftGroup, newRaftLeader);
if (!reply.isSuccess()) {
return failed(new RatisRequestFailedException(reply.getException()));
}
@@ -584,15 +584,21 @@ class RatisConsensus implements IConsensus {
return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
}
- // TODO when Ratis implements read leader transfer mechanism, change this implementation
- private RaftClientReply forceStepDownLeader(RaftGroup group) throws IOException {
+ private void forceStepDownLeader(RaftGroup group) throws IOException {
+ // when newLeaderPeerId == null, ratis forces current leader to step down and raise new
+ // election
+ transferLeader(group, null);
+ }
+
+ private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) throws IOException {
RatisClient client = null;
try {
client = getRaftClient(group);
// TODO tuning for timeoutMs
- // when newLeaderPeerId == null, ratis forces current leader to step down and raise new
- // election
- return client.getRaftClient().admin().transferLeadership(null, 5000);
+ return client
+ .getRaftClient()
+ .admin()
+ .transferLeadership(newLeader != null ? newLeader.getId() : null, 10000);
} finally {
if (client != null) {
client.returnSelf();
@@ -700,16 +706,17 @@ class RatisConsensus implements IConsensus {
}
private ConsensusGenericResponse failed(ConsensusException e) {
+ logger.error("{} request failed with exception {}", this, e);
return ConsensusGenericResponse.newBuilder().setSuccess(false).setException(e).build();
}
private ConsensusWriteResponse failedWrite(ConsensusException e) {
- logger.error("write request failed with exception", e);
+ logger.error("{} write request failed with exception {}", this, e);
return ConsensusWriteResponse.newBuilder().setException(e).build();
}
private ConsensusReadResponse failedRead(ConsensusException e) {
- logger.error("read request failed with exception", e);
+ logger.error("{} read request failed with exception {}", this, e);
return ConsensusReadResponse.newBuilder().setException(e).build();
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 041b9a0172..877f867585 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -221,6 +221,24 @@ public class RatisConsensusTest {
doConsensus(servers.get(0), gid, 10, 210);
}
+ @Test
+ public void transferLeader() throws Exception {
+ servers.get(0).createPeer(group.getGroupId(), group.getPeers());
+ servers.get(1).createPeer(group.getGroupId(), group.getPeers());
+ servers.get(2).createPeer(group.getGroupId(), group.getPeers());
+
+ doConsensus(servers.get(0), group.getGroupId(), 10, 10);
+
+ int leaderIndex = servers.get(0).getLeader(group.getGroupId()).getNodeId() - 1;
+
+ ConsensusGenericResponse resp =
+ servers.get(0).transferLeader(group.getGroupId(), peers.get((leaderIndex + 1) % 3));
+ Assert.assertTrue(resp.isSuccess());
+
+ int newLeaderIndex = servers.get(0).getLeader(group.getGroupId()).getNodeId() - 1;
+ Assert.assertEquals((leaderIndex + 1) % 3, newLeaderIndex);
+ }
+
private void doConsensus(IConsensus consensus, ConsensusGroupId gid, int count, int target)
throws Exception {