You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/04 03:21:38 UTC

[iotdb] branch master updated: fix (#5784)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b943e4cd3 fix (#5784)
2b943e4cd3 is described below

commit 2b943e4cd3c3f65d1ee557660a399fc0885d3c97
Author: Potato <TX...@gmail.com>
AuthorDate: Wed May 4 11:21:34 2022 +0800

    fix (#5784)
---
 .../iotdb/consensus/ratis/RatisConsensus.java      | 75 ++++++++++++++++------
 1 file changed, 54 insertions(+), 21 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index f8d65fa0b0..96b3b766e3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -94,7 +94,7 @@ class RatisConsensus implements IConsensus {
       new IClientManager.Factory<RaftGroup, RatisClient>()
           .createClientManager(new RatisClientPoolFactory());
 
-  private Map<RaftGroupId, RaftGroup> lastSeen;
+  private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>();
 
   private final ClientId localFakeId = ClientId.randomId();
   private final AtomicLong localFakeCallId = new AtomicLong(0);
@@ -108,9 +108,6 @@ class RatisConsensus implements IConsensus {
    */
   public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
       throws IOException {
-    lastSeen = new ConcurrentHashMap<>();
-
-    // create a RaftPeer as endpoint of comm
     String address = Utils.IPAddress(endpoint);
     myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
 
@@ -174,7 +171,6 @@ class RatisConsensus implements IConsensus {
         TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
         return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
       }
-
       NotLeaderException ex = localServerReply.getNotLeaderException();
       if (ex != null) { // local server is not leader
         suggestedLeader = ex.getSuggestedLeader();
@@ -185,13 +181,20 @@ class RatisConsensus implements IConsensus {
 
     // 2. try raft client
     TSStatus writeResult;
+    RatisClient client = null;
     try {
-      RatisClient client = getRaftClient(raftGroup);
+      client = getRaftClient(raftGroup);
       RaftClientReply reply = client.getRaftClient().io().send(message);
+      if (!reply.isSuccess()) {
+        return failedWrite(new RatisRequestFailedException(reply.getException()));
+      }
       writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
-      client.returnSelf();
     } catch (IOException | TException e) {
       return failedWrite(new RatisRequestFailedException(e));
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
     }
 
     if (suggestedLeader != null) {
@@ -214,11 +217,12 @@ class RatisConsensus implements IConsensus {
     RaftClientReply reply;
     try {
       RequestMessage message = new RequestMessage(IConsensusRequest);
-
       RaftClientRequest clientRequest =
-          buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(0));
-
+          buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(-1));
       reply = server.submitClientRequest(clientRequest);
+      if (!reply.isSuccess()) {
+        return failedRead(new RatisRequestFailedException(reply.getException()));
+      }
     } catch (IOException e) {
       return failedRead(new RatisRequestFailedException(e));
     }
@@ -247,12 +251,19 @@ class RatisConsensus implements IConsensus {
 
     // add RaftPeer myself to this RaftGroup
     RaftClientReply reply;
+    RatisClient client = null;
     try {
-      RatisClient client = getRaftClient(group);
+      client = getRaftClient(group);
       reply = client.getRaftClient().getGroupManagementApi(myself.getId()).add(group);
-      client.returnSelf();
+      if (!reply.isSuccess()) {
+        return failed(new RatisRequestFailedException(reply.getException()));
+      }
     } catch (IOException e) {
       return failed(new RatisRequestFailedException(e));
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
     }
 
     return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
@@ -277,16 +288,23 @@ class RatisConsensus implements IConsensus {
 
     // send remove group to myself
     RaftClientReply reply;
+    RatisClient client = null;
     try {
-      RatisClient client = getRaftClient(raftGroup);
+      client = getRaftClient(raftGroup);
       reply =
           client
               .getRaftClient()
               .getGroupManagementApi(myself.getId())
               .remove(raftGroupId, false, false);
-      client.returnSelf();
+      if (!reply.isSuccess()) {
+        return failed(new RatisRequestFailedException(reply.getException()));
+      }
     } catch (IOException e) {
       return failed(new RatisRequestFailedException(e));
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
     }
 
     return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
@@ -414,19 +432,27 @@ class RatisConsensus implements IConsensus {
       }
     }
 
-    RaftClientReply reply = null;
+    RaftClientReply reply;
+    RatisClient client = null;
     try {
-      RatisClient client = getRaftClient(raftGroup);
+      client = getRaftClient(raftGroup);
       RaftClientReply configChangeReply =
           client.getRaftClient().admin().setConfiguration(newConfiguration);
       if (!configChangeReply.isSuccess()) {
         return failed(new RatisRequestFailedException(configChangeReply.getException()));
       }
       // TODO tuning for timeoutMs
-      reply = client.getRaftClient().admin().transferLeadership(newRaftLeader.getId(), 2000);
-      client.returnSelf();
+      reply = client.getRaftClient().admin().transferLeadership(newRaftLeader.getId(), 5000);
+      if (!reply.isSuccess()) {
+        return failed(new RatisRequestFailedException(reply.getException()));
+      }
+
     } catch (IOException e) {
       return failed(new RatisRequestFailedException(e));
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
     }
     return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
@@ -453,7 +479,7 @@ class RatisConsensus implements IConsensus {
     }
 
     RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    RaftClient client = null;
+    RaftClient client;
     try {
       client = server.getDivision(raftGroupId).getRaftClient();
     } catch (IOException e) {
@@ -531,13 +557,20 @@ class RatisConsensus implements IConsensus {
       throws RatisRequestFailedException {
     // notify the group leader of configuration change
     RaftClientReply reply;
+    RatisClient client = null;
     try {
-      RatisClient client = getRaftClient(newGroupConf);
+      client = getRaftClient(newGroupConf);
       reply =
           client.getRaftClient().admin().setConfiguration(new ArrayList<>(newGroupConf.getPeers()));
-      client.returnSelf();
+      if (!reply.isSuccess()) {
+        throw new RatisRequestFailedException(reply.getException());
+      }
     } catch (IOException e) {
       throw new RatisRequestFailedException(e);
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
     }
     return reply;
   }