You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/23 06:49:50 UTC
[iotdb] branch master updated: [IOTDB-2984] RatisConsensus Recovery Logic (#5648)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a27cd2e58b [IOTDB-2984] RatisConsensus Recovery Logic (#5648)
a27cd2e58b is described below
commit a27cd2e58b19517a9aadf8aac3850e3b796bec5d
Author: SzyWilliam <48...@users.noreply.github.com>
AuthorDate: Sat Apr 23 14:49:45 2022 +0800
[IOTDB-2984] RatisConsensus Recovery Logic (#5648)
* ratis recovery
* ratis recovery
* fix NPE
---
.../iotdb/consensus/ratis/RatisConsensus.java | 150 +++++++--------------
.../iotdb/consensus/ratis/RatisConsensusTest.java | 7 +-
2 files changed, 54 insertions(+), 103 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 29b822da8c..f05ce751c9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -42,7 +42,6 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
@@ -62,8 +61,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -78,9 +75,6 @@ class RatisConsensus implements IConsensus {
private final RaftServer server;
- private final Map<RaftGroupId, RaftClient> clientMap;
- private final Map<RaftGroupId, RaftGroup> raftGroupMap;
-
private ClientId localFakeId;
private AtomicLong localFakeCallId;
@@ -92,8 +86,6 @@ class RatisConsensus implements IConsensus {
public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
throws IOException {
- this.clientMap = new ConcurrentHashMap<>();
- this.raftGroupMap = new ConcurrentHashMap<>();
this.localFakeId = ClientId.randomId();
this.localFakeCallId = new AtomicLong(0);
@@ -139,7 +131,8 @@ class RatisConsensus implements IConsensus {
ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
// pre-condition: group exists and myself server serves this group
- RaftGroup raftGroup = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+ RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+ RaftGroup raftGroup = getGroupInfo(raftGroupId);
if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
return failedWrite(new ConsensusGroupNotExistException(groupId));
}
@@ -169,11 +162,13 @@ class RatisConsensus implements IConsensus {
}
// 2. try raft client
- RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+ RaftClient client = null;
TSStatus writeResult;
try {
+ client = buildClient(raftGroup);
RaftClientReply reply = client.io().send(message);
writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+ client.close();
} catch (IOException | TException e) {
return failedWrite(new RatisRequestFailedException(e));
}
@@ -190,7 +185,7 @@ class RatisConsensus implements IConsensus {
@Override
public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
- RaftGroup group = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+ RaftGroup group = getGroupInfo(Utils.toRatisGroupId(groupId));
if (group == null || !group.getPeers().contains(myself)) {
return failedRead(new ConsensusGroupNotExistException(groupId));
}
@@ -228,15 +223,15 @@ class RatisConsensus implements IConsensus {
if (!group.getPeers().contains(myself)) {
return failed(new ConsensusGroupNotExistException(groupId));
}
- raftGroupMap.put(group.getGroupId(), group);
// build and store the corresponding client
- RaftClient client = buildClientAndCache(group);
+ RaftClient client = buildClient(group);
// add RaftPeer myself to this RaftGroup
RaftClientReply reply;
try {
reply = client.getGroupManagementApi(myself.getId()).add(group);
+ client.close();
} catch (IOException e) {
return failed(new RatisRequestFailedException(e));
}
@@ -254,28 +249,24 @@ class RatisConsensus implements IConsensus {
@Override
public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
- RaftGroup raftGroup = raftGroupMap.get(raftGroupId);
+ RaftGroup raftGroup = getGroupInfo(raftGroupId);
// pre-conditions: group exists and myself in this group
if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
return failed(new PeerNotInConsensusGroupException(groupId, myself));
}
- RaftClient client = clientMap.get(raftGroupId);
+ RaftClient client;
// send remove group to myself
RaftClientReply reply;
try {
+ client = buildClient(raftGroup);
reply = client.getGroupManagementApi(myself.getId()).remove(raftGroupId, false, false);
+ client.close();
} catch (IOException e) {
return failed(new RatisRequestFailedException(e));
}
- if (reply.isSuccess()) {
- // delete Group information and its corresponding client
- raftGroupMap.remove(raftGroupId);
- closeRaftClient(raftGroupId);
- clientMap.remove(raftGroupId);
- }
return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
}
@@ -288,12 +279,7 @@ class RatisConsensus implements IConsensus {
@Override
public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
- try {
- syncGroupInfoAndRebuildClient(groupId);
- } catch (ConsensusGroupNotExistException e) {
- return failed(e);
- }
- RaftGroup group = raftGroupMap.get(raftGroupId);
+ RaftGroup group = getGroupInfo(raftGroupId);
RaftPeer peerToAdd = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
// pre-conditions: group exists and myself in this group
@@ -311,11 +297,8 @@ class RatisConsensus implements IConsensus {
RaftClientReply reply;
try {
- reply = sendReconfiguration(raftGroupId, newConfig);
-
- // sync again
- syncGroupInfoAndRebuildClient(groupId);
- } catch (RatisRequestFailedException | ConsensusGroupNotExistException e) {
+ reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
+ } catch (RatisRequestFailedException e) {
return failed(e);
}
@@ -331,12 +314,7 @@ class RatisConsensus implements IConsensus {
@Override
public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
- try {
- syncGroupInfoAndRebuildClient(groupId);
- } catch (ConsensusGroupNotExistException e) {
- return failed(e);
- }
- RaftGroup group = raftGroupMap.get(raftGroupId);
+ RaftGroup group = getGroupInfo(raftGroupId);
RaftPeer peerToRemove = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
// pre-conditions: group exists and myself in this group
@@ -356,10 +334,8 @@ class RatisConsensus implements IConsensus {
RaftClientReply reply;
try {
- reply = sendReconfiguration(raftGroupId, newConfig);
- // sync again
- syncGroupInfoAndRebuildClient(groupId);
- } catch (RatisRequestFailedException | ConsensusGroupNotExistException e) {
+ reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
+ } catch (RatisRequestFailedException e) {
return failed(e);
}
@@ -374,18 +350,15 @@ class RatisConsensus implements IConsensus {
if (!raftGroup.getPeers().contains(myself)) {
return failed(new ConsensusGroupNotExistException(groupId));
}
- raftGroupMap.put(raftGroup.getGroupId(), raftGroup);
// build the client and store it
- buildClientAndCache(raftGroup);
+ buildClient(raftGroup);
// add RaftPeer myself to this RaftGroup
RaftClientReply reply;
try {
- reply = sendReconfiguration(raftGroup.getGroupId(), new ArrayList<>(raftGroup.getPeers()));
- // sync again
- syncGroupInfoAndRebuildClient(groupId);
- } catch (ConsensusGroupNotExistException | RatisRequestFailedException e) {
+ reply = sendReconfiguration(raftGroup);
+ } catch (RatisRequestFailedException e) {
return failed(new RatisRequestFailedException(e));
}
return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
@@ -398,15 +371,14 @@ class RatisConsensus implements IConsensus {
// So we have to enhance to leader's priority
// first fetch the newest information
- try {
- syncGroupInfoAndRebuildClient(groupId);
- } catch (ConsensusGroupNotExistException e) {
- return failed(e);
- }
RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
- RaftGroup raftGroup = raftGroupMap.get(raftGroupId);
- RaftClient client = clientMap.get(raftGroupId);
+ RaftGroup raftGroup = getGroupInfo(raftGroupId);
+
+ if (raftGroup == null) {
+ return failed(new ConsensusGroupNotExistException(groupId));
+ }
+
RaftPeer newRaftLeader = Utils.toRaftPeer(newLeader, LEADER_PRIORITY);
ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
@@ -419,6 +391,7 @@ class RatisConsensus implements IConsensus {
}
}
+ RaftClient client = buildClient(raftGroup);
RaftClientReply reply = null;
try {
RaftClientReply configChangeReply = client.admin().setConfiguration(newConfiguration);
@@ -427,6 +400,7 @@ class RatisConsensus implements IConsensus {
}
// TODO tuning for timeoutMs
reply = client.admin().transferLeadership(newRaftLeader.getId(), 2000);
+ client.close();
} catch (IOException e) {
return failed(new RatisRequestFailedException(e));
}
@@ -455,8 +429,11 @@ class RatisConsensus implements IConsensus {
}
RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
- RaftClient client = clientMap.getOrDefault(raftGroupId, null);
- if (client == null) {
+ RaftClient client = null;
+ try {
+ client = server.getDivision(raftGroupId).getRaftClient();
+ } catch (IOException e) {
+ logger.warn("cannot find raft client for group " + groupId);
return null;
}
TEndPoint leaderEndpoint = Utils.parseFromRatisId(client.getLeaderId().toString());
@@ -492,6 +469,16 @@ class RatisConsensus implements IConsensus {
.build();
}
+ private RaftGroup getGroupInfo(RaftGroupId raftGroupId) {
+ RaftGroup raftGroup = null;
+ try {
+ raftGroup = server.getDivision(raftGroupId).getGroup();
+ } catch (IOException e) {
+ logger.debug("get group failed ", e);
+ }
+ return raftGroup;
+ }
+
private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
List<RaftPeer> raftPeers =
peers.stream()
@@ -500,7 +487,7 @@ class RatisConsensus implements IConsensus {
return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
}
- private RaftClient buildClientAndCache(RaftGroup group) {
+ private RaftClient buildClient(RaftGroup group) {
RaftProperties raftProperties = new RaftProperties();
RaftClient.Builder builder =
RaftClient.newBuilder()
@@ -509,54 +496,17 @@ class RatisConsensus implements IConsensus {
.setClientRpc(
new GrpcFactory(new Parameters())
.newRaftClientRpc(ClientId.randomId(), raftProperties));
- RaftClient client = builder.build();
- closeRaftClient(group.getGroupId());
- clientMap.put(group.getGroupId(), client);
- return client;
- }
-
- private void closeRaftClient(RaftGroupId groupId) {
- RaftClient client = clientMap.get(groupId);
- if (client != null) {
- try {
- client.close();
- } catch (IOException exception) {
- logger.warn("client for gid {} close failure {}", groupId, exception);
- }
- }
- }
-
- /**
- * This function will use the previous client for groupId to query the latest group info It will
- * update the new group info into the groupMap and rebuild its client
- *
- * @throws ConsensusGroupNotExistException when cannot get the group info
- */
- private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
- throws ConsensusGroupNotExistException {
- RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
- RaftClient current = clientMap.get(raftGroupId);
- try {
- GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
-
- if (!reply.isSuccess()) {
- throw new ConsensusGroupNotExistException(groupId);
- }
-
- raftGroupMap.put(raftGroupId, reply.getGroup());
- buildClientAndCache(raftGroupMap.get(raftGroupId));
- } catch (IOException e) {
- throw new ConsensusGroupNotExistException(groupId);
- }
+ return builder.build();
}
- private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+ private RaftClientReply sendReconfiguration(RaftGroup newGroupConf)
throws RatisRequestFailedException {
- RaftClient client = clientMap.get(raftGroupId);
+ RaftClient client = buildClient(newGroupConf);
// notify the group leader of configuration change
RaftClientReply reply;
try {
- reply = client.admin().setConfiguration(peers);
+ reply = client.admin().setConfiguration(new ArrayList<>(newGroupConf.getPeers()));
+ client.close();
} catch (IOException e) {
throw new RatisRequestFailedException(e);
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index f132dbbf01..ce9817b6f8 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -190,15 +190,15 @@ public class RatisConsensusTest {
// 6. Remove two Peers from Group (peer 0 and peer 2)
// transfer the leader to peer1
- // servers.get(0).transferLeader(gid, peer1);
- // Assert.assertTrue(servers.get(1).isLeader(gid));
+ servers.get(0).transferLeader(gid, peer1);
+ Assert.assertTrue(servers.get(1).isLeader(gid));
// first use removePeer to inform the group leader of configuration change
servers.get(1).removePeer(gid, peer0);
servers.get(1).removePeer(gid, peer2);
// then use removeConsensusGroup to clean up removed Consensus-Peer's states
servers.get(0).removeConsensusGroup(gid);
servers.get(2).removeConsensusGroup(gid);
- Assert.assertEquals(servers.get(1).getLeader(gid), peers.get(1));
+ Assert.assertEquals(servers.get(1).getLeader(gid).getEndpoint(), peers.get(1).getEndpoint());
// 7. try consensus again with one peer
doConsensus(servers.get(1), gid, 10, 20);
@@ -215,6 +215,7 @@ public class RatisConsensusTest {
doConsensus(servers.get(2), gid, 10, 30);
// 10. again, group contains only peer0
+ servers.get(0).transferLeader(group.getGroupId(), peer0);
servers.get(0).changePeer(group.getGroupId(), Collections.singletonList(peer0));
servers.get(1).removeConsensusGroup(group.getGroupId());
servers.get(2).removeConsensusGroup(group.getGroupId());