You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/23 06:49:50 UTC

[iotdb] branch master updated: [IOTDB-2984] RatisConsensus Recovery Logic (#5648)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a27cd2e58b [IOTDB-2984] RatisConsensus Recovery Logic (#5648)
a27cd2e58b is described below

commit a27cd2e58b19517a9aadf8aac3850e3b796bec5d
Author: SzyWilliam <48...@users.noreply.github.com>
AuthorDate: Sat Apr 23 14:49:45 2022 +0800

    [IOTDB-2984] RatisConsensus Recovery Logic (#5648)
    
    * ratis recovery
    
    * ratis recovery
    
    * fix NPE
---
 .../iotdb/consensus/ratis/RatisConsensus.java      | 150 +++++++--------------
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   7 +-
 2 files changed, 54 insertions(+), 103 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 29b822da8c..f05ce751c9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -42,7 +42,6 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcFactory;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.GroupInfoReply;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -62,8 +61,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -78,9 +75,6 @@ class RatisConsensus implements IConsensus {
 
   private final RaftServer server;
 
-  private final Map<RaftGroupId, RaftClient> clientMap;
-  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
-
   private ClientId localFakeId;
   private AtomicLong localFakeCallId;
 
@@ -92,8 +86,6 @@ class RatisConsensus implements IConsensus {
   public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
       throws IOException {
 
-    this.clientMap = new ConcurrentHashMap<>();
-    this.raftGroupMap = new ConcurrentHashMap<>();
     this.localFakeId = ClientId.randomId();
     this.localFakeCallId = new AtomicLong(0);
 
@@ -139,7 +131,8 @@ class RatisConsensus implements IConsensus {
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
 
     // pre-condition: group exists and myself server serves this group
-    RaftGroup raftGroup = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftGroup raftGroup = getGroupInfo(raftGroupId);
     if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
       return failedWrite(new ConsensusGroupNotExistException(groupId));
     }
@@ -169,11 +162,13 @@ class RatisConsensus implements IConsensus {
     }
 
     // 2. try raft client
-    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    RaftClient client = null;
     TSStatus writeResult;
     try {
+      client = buildClient(raftGroup);
       RaftClientReply reply = client.io().send(message);
       writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+      client.close();
     } catch (IOException | TException e) {
       return failedWrite(new RatisRequestFailedException(e));
     }
@@ -190,7 +185,7 @@ class RatisConsensus implements IConsensus {
   @Override
   public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
 
-    RaftGroup group = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    RaftGroup group = getGroupInfo(Utils.toRatisGroupId(groupId));
     if (group == null || !group.getPeers().contains(myself)) {
       return failedRead(new ConsensusGroupNotExistException(groupId));
     }
@@ -228,15 +223,15 @@ class RatisConsensus implements IConsensus {
     if (!group.getPeers().contains(myself)) {
       return failed(new ConsensusGroupNotExistException(groupId));
     }
-    raftGroupMap.put(group.getGroupId(), group);
 
     // build and store the corresponding client
-    RaftClient client = buildClientAndCache(group);
+    RaftClient client = buildClient(group);
 
     // add RaftPeer myself to this RaftGroup
     RaftClientReply reply;
     try {
       reply = client.getGroupManagementApi(myself.getId()).add(group);
+      client.close();
     } catch (IOException e) {
       return failed(new RatisRequestFailedException(e));
     }
@@ -254,28 +249,24 @@ class RatisConsensus implements IConsensus {
   @Override
   public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
     RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    RaftGroup raftGroup = raftGroupMap.get(raftGroupId);
+    RaftGroup raftGroup = getGroupInfo(raftGroupId);
 
     // pre-conditions: group exists and myself in this group
     if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
       return failed(new PeerNotInConsensusGroupException(groupId, myself));
     }
 
-    RaftClient client = clientMap.get(raftGroupId);
+    RaftClient client;
     // send remove group to myself
     RaftClientReply reply;
     try {
+      client = buildClient(raftGroup);
       reply = client.getGroupManagementApi(myself.getId()).remove(raftGroupId, false, false);
+      client.close();
     } catch (IOException e) {
       return failed(new RatisRequestFailedException(e));
     }
 
-    if (reply.isSuccess()) {
-      // delete Group information and its corresponding client
-      raftGroupMap.remove(raftGroupId);
-      closeRaftClient(raftGroupId);
-      clientMap.remove(raftGroupId);
-    }
     return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
@@ -288,12 +279,7 @@ class RatisConsensus implements IConsensus {
   @Override
   public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
     RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    try {
-      syncGroupInfoAndRebuildClient(groupId);
-    } catch (ConsensusGroupNotExistException e) {
-      return failed(e);
-    }
-    RaftGroup group = raftGroupMap.get(raftGroupId);
+    RaftGroup group = getGroupInfo(raftGroupId);
     RaftPeer peerToAdd = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
 
     // pre-conditions: group exists and myself in this group
@@ -311,11 +297,8 @@ class RatisConsensus implements IConsensus {
 
     RaftClientReply reply;
     try {
-      reply = sendReconfiguration(raftGroupId, newConfig);
-
-      // sync again
-      syncGroupInfoAndRebuildClient(groupId);
-    } catch (RatisRequestFailedException | ConsensusGroupNotExistException e) {
+      reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
+    } catch (RatisRequestFailedException e) {
       return failed(e);
     }
 
@@ -331,12 +314,7 @@ class RatisConsensus implements IConsensus {
   @Override
   public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
     RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    try {
-      syncGroupInfoAndRebuildClient(groupId);
-    } catch (ConsensusGroupNotExistException e) {
-      return failed(e);
-    }
-    RaftGroup group = raftGroupMap.get(raftGroupId);
+    RaftGroup group = getGroupInfo(raftGroupId);
     RaftPeer peerToRemove = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
 
     // pre-conditions: group exists and myself in this group
@@ -356,10 +334,8 @@ class RatisConsensus implements IConsensus {
 
     RaftClientReply reply;
     try {
-      reply = sendReconfiguration(raftGroupId, newConfig);
-      // sync again
-      syncGroupInfoAndRebuildClient(groupId);
-    } catch (RatisRequestFailedException | ConsensusGroupNotExistException e) {
+      reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
+    } catch (RatisRequestFailedException e) {
       return failed(e);
     }
 
@@ -374,18 +350,15 @@ class RatisConsensus implements IConsensus {
     if (!raftGroup.getPeers().contains(myself)) {
       return failed(new ConsensusGroupNotExistException(groupId));
     }
-    raftGroupMap.put(raftGroup.getGroupId(), raftGroup);
 
     // build the client and store it
-    buildClientAndCache(raftGroup);
+    buildClient(raftGroup);
 
     // add RaftPeer myself to this RaftGroup
     RaftClientReply reply;
     try {
-      reply = sendReconfiguration(raftGroup.getGroupId(), new ArrayList<>(raftGroup.getPeers()));
-      // sync again
-      syncGroupInfoAndRebuildClient(groupId);
-    } catch (ConsensusGroupNotExistException | RatisRequestFailedException e) {
+      reply = sendReconfiguration(raftGroup);
+    } catch (RatisRequestFailedException e) {
       return failed(new RatisRequestFailedException(e));
     }
     return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
@@ -398,15 +371,14 @@ class RatisConsensus implements IConsensus {
     // So we have to enhance to leader's priority
 
     // first fetch the newest information
-    try {
-      syncGroupInfoAndRebuildClient(groupId);
-    } catch (ConsensusGroupNotExistException e) {
-      return failed(e);
-    }
 
     RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    RaftGroup raftGroup = raftGroupMap.get(raftGroupId);
-    RaftClient client = clientMap.get(raftGroupId);
+    RaftGroup raftGroup = getGroupInfo(raftGroupId);
+
+    if (raftGroup == null) {
+      return failed(new ConsensusGroupNotExistException(groupId));
+    }
+
     RaftPeer newRaftLeader = Utils.toRaftPeer(newLeader, LEADER_PRIORITY);
 
     ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
@@ -419,6 +391,7 @@ class RatisConsensus implements IConsensus {
       }
     }
 
+    RaftClient client = buildClient(raftGroup);
     RaftClientReply reply = null;
     try {
       RaftClientReply configChangeReply = client.admin().setConfiguration(newConfiguration);
@@ -427,6 +400,7 @@ class RatisConsensus implements IConsensus {
       }
       // TODO tuning for timeoutMs
       reply = client.admin().transferLeadership(newRaftLeader.getId(), 2000);
+      client.close();
     } catch (IOException e) {
       return failed(new RatisRequestFailedException(e));
     }
@@ -455,8 +429,11 @@ class RatisConsensus implements IConsensus {
     }
 
     RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    RaftClient client = clientMap.getOrDefault(raftGroupId, null);
-    if (client == null) {
+    RaftClient client = null;
+    try {
+      client = server.getDivision(raftGroupId).getRaftClient();
+    } catch (IOException e) {
+      logger.warn("cannot find raft client for group " + groupId);
       return null;
     }
     TEndPoint leaderEndpoint = Utils.parseFromRatisId(client.getLeaderId().toString());
@@ -492,6 +469,16 @@ class RatisConsensus implements IConsensus {
         .build();
   }
 
+  private RaftGroup getGroupInfo(RaftGroupId raftGroupId) {
+    RaftGroup raftGroup = null;
+    try {
+      raftGroup = server.getDivision(raftGroupId).getGroup();
+    } catch (IOException e) {
+      logger.debug("get group failed ", e);
+    }
+    return raftGroup;
+  }
+
   private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
     List<RaftPeer> raftPeers =
         peers.stream()
@@ -500,7 +487,7 @@ class RatisConsensus implements IConsensus {
     return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
   }
 
-  private RaftClient buildClientAndCache(RaftGroup group) {
+  private RaftClient buildClient(RaftGroup group) {
     RaftProperties raftProperties = new RaftProperties();
     RaftClient.Builder builder =
         RaftClient.newBuilder()
@@ -509,54 +496,17 @@ class RatisConsensus implements IConsensus {
             .setClientRpc(
                 new GrpcFactory(new Parameters())
                     .newRaftClientRpc(ClientId.randomId(), raftProperties));
-    RaftClient client = builder.build();
-    closeRaftClient(group.getGroupId());
-    clientMap.put(group.getGroupId(), client);
-    return client;
-  }
-
-  private void closeRaftClient(RaftGroupId groupId) {
-    RaftClient client = clientMap.get(groupId);
-    if (client != null) {
-      try {
-        client.close();
-      } catch (IOException exception) {
-        logger.warn("client for gid {} close failure {}", groupId, exception);
-      }
-    }
-  }
-
-  /**
-   * This function will use the previous client for groupId to query the latest group info It will
-   * update the new group info into the groupMap and rebuild its client
-   *
-   * @throws ConsensusGroupNotExistException when cannot get the group info
-   */
-  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
-      throws ConsensusGroupNotExistException {
-    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    RaftClient current = clientMap.get(raftGroupId);
-    try {
-      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
-
-      if (!reply.isSuccess()) {
-        throw new ConsensusGroupNotExistException(groupId);
-      }
-
-      raftGroupMap.put(raftGroupId, reply.getGroup());
-      buildClientAndCache(raftGroupMap.get(raftGroupId));
-    } catch (IOException e) {
-      throw new ConsensusGroupNotExistException(groupId);
-    }
+    return builder.build();
   }
 
-  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+  private RaftClientReply sendReconfiguration(RaftGroup newGroupConf)
       throws RatisRequestFailedException {
-    RaftClient client = clientMap.get(raftGroupId);
+    RaftClient client = buildClient(newGroupConf);
     // notify the group leader of configuration change
     RaftClientReply reply;
     try {
-      reply = client.admin().setConfiguration(peers);
+      reply = client.admin().setConfiguration(new ArrayList<>(newGroupConf.getPeers()));
+      client.close();
     } catch (IOException e) {
       throw new RatisRequestFailedException(e);
     }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index f132dbbf01..ce9817b6f8 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -190,15 +190,15 @@ public class RatisConsensusTest {
 
     // 6. Remove two Peers from Group (peer 0 and peer 2)
     // transfer the leader to peer1
-    // servers.get(0).transferLeader(gid, peer1);
-    // Assert.assertTrue(servers.get(1).isLeader(gid));
+    servers.get(0).transferLeader(gid, peer1);
+    Assert.assertTrue(servers.get(1).isLeader(gid));
     // first use removePeer to inform the group leader of configuration change
     servers.get(1).removePeer(gid, peer0);
     servers.get(1).removePeer(gid, peer2);
     // then use removeConsensusGroup to clean up removed Consensus-Peer's states
     servers.get(0).removeConsensusGroup(gid);
     servers.get(2).removeConsensusGroup(gid);
-    Assert.assertEquals(servers.get(1).getLeader(gid), peers.get(1));
+    Assert.assertEquals(servers.get(1).getLeader(gid).getEndpoint(), peers.get(1).getEndpoint());
 
     // 7. try consensus again with one peer
     doConsensus(servers.get(1), gid, 10, 20);
@@ -215,6 +215,7 @@ public class RatisConsensusTest {
     doConsensus(servers.get(2), gid, 10, 30);
 
     // 10. again, group contains only peer0
+    servers.get(0).transferLeader(group.getGroupId(), peer0);
     servers.get(0).changePeer(group.getGroupId(), Collections.singletonList(peer0));
     servers.get(1).removeConsensusGroup(group.getGroupId());
     servers.get(2).removeConsensusGroup(group.getGroupId());