You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/03 10:11:44 UTC

[iotdb] branch issue_3090 created (now c3da6cc90b)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch issue_3090
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at c3da6cc90b fix

This branch includes the following new commits:

     new c3da6cc90b fix

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch issue_3090
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c3da6cc90b468268d07acf425ee1350a6b4fadef
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue May 3 18:11:29 2022 +0800

    fix
---
 .../iotdb/consensus/ratis/RatisConsensus.java      | 75 ++++++++++++++++------
 1 file changed, 54 insertions(+), 21 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index f8d65fa0b0..96b3b766e3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -94,7 +94,7 @@ class RatisConsensus implements IConsensus {
       new IClientManager.Factory<RaftGroup, RatisClient>()
           .createClientManager(new RatisClientPoolFactory());
 
-  private Map<RaftGroupId, RaftGroup> lastSeen;
+  private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>();
 
   private final ClientId localFakeId = ClientId.randomId();
   private final AtomicLong localFakeCallId = new AtomicLong(0);
@@ -108,9 +108,6 @@ class RatisConsensus implements IConsensus {
    */
   public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
       throws IOException {
-    lastSeen = new ConcurrentHashMap<>();
-
-    // create a RaftPeer as endpoint of comm
     String address = Utils.IPAddress(endpoint);
     myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
 
@@ -174,7 +171,6 @@ class RatisConsensus implements IConsensus {
         TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
         return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
       }
-
       NotLeaderException ex = localServerReply.getNotLeaderException();
       if (ex != null) { // local server is not leader
         suggestedLeader = ex.getSuggestedLeader();
@@ -185,13 +181,20 @@ class RatisConsensus implements IConsensus {
 
     // 2. try raft client
     TSStatus writeResult;
+    RatisClient client = null;
     try {
-      RatisClient client = getRaftClient(raftGroup);
+      client = getRaftClient(raftGroup);
       RaftClientReply reply = client.getRaftClient().io().send(message);
+      if (!reply.isSuccess()) {
+        return failedWrite(new RatisRequestFailedException(reply.getException()));
+      }
       writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
-      client.returnSelf();
     } catch (IOException | TException e) {
       return failedWrite(new RatisRequestFailedException(e));
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
     }
 
     if (suggestedLeader != null) {
@@ -214,11 +217,12 @@ class RatisConsensus implements IConsensus {
     RaftClientReply reply;
     try {
       RequestMessage message = new RequestMessage(IConsensusRequest);
-
       RaftClientRequest clientRequest =
-          buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(0));
-
+          buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(-1));
       reply = server.submitClientRequest(clientRequest);
+      if (!reply.isSuccess()) {
+        return failedRead(new RatisRequestFailedException(reply.getException()));
+      }
     } catch (IOException e) {
       return failedRead(new RatisRequestFailedException(e));
     }
@@ -247,12 +251,19 @@ class RatisConsensus implements IConsensus {
 
     // add RaftPeer myself to this RaftGroup
     RaftClientReply reply;
+    RatisClient client = null;
     try {
-      RatisClient client = getRaftClient(group);
+      client = getRaftClient(group);
       reply = client.getRaftClient().getGroupManagementApi(myself.getId()).add(group);
-      client.returnSelf();
+      if (!reply.isSuccess()) {
+        return failed(new RatisRequestFailedException(reply.getException()));
+      }
     } catch (IOException e) {
       return failed(new RatisRequestFailedException(e));
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
     }
 
     return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
@@ -277,16 +288,23 @@ class RatisConsensus implements IConsensus {
 
     // send remove group to myself
     RaftClientReply reply;
+    RatisClient client = null;
     try {
-      RatisClient client = getRaftClient(raftGroup);
+      client = getRaftClient(raftGroup);
       reply =
           client
               .getRaftClient()
               .getGroupManagementApi(myself.getId())
               .remove(raftGroupId, false, false);
-      client.returnSelf();
+      if (!reply.isSuccess()) {
+        return failed(new RatisRequestFailedException(reply.getException()));
+      }
     } catch (IOException e) {
       return failed(new RatisRequestFailedException(e));
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
     }
 
     return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
@@ -414,19 +432,27 @@ class RatisConsensus implements IConsensus {
       }
     }
 
-    RaftClientReply reply = null;
+    RaftClientReply reply;
+    RatisClient client = null;
     try {
-      RatisClient client = getRaftClient(raftGroup);
+      client = getRaftClient(raftGroup);
       RaftClientReply configChangeReply =
           client.getRaftClient().admin().setConfiguration(newConfiguration);
       if (!configChangeReply.isSuccess()) {
         return failed(new RatisRequestFailedException(configChangeReply.getException()));
       }
       // TODO tuning for timeoutMs
-      reply = client.getRaftClient().admin().transferLeadership(newRaftLeader.getId(), 2000);
-      client.returnSelf();
+      reply = client.getRaftClient().admin().transferLeadership(newRaftLeader.getId(), 5000);
+      if (!reply.isSuccess()) {
+        return failed(new RatisRequestFailedException(reply.getException()));
+      }
+
     } catch (IOException e) {
       return failed(new RatisRequestFailedException(e));
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
     }
     return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
@@ -453,7 +479,7 @@ class RatisConsensus implements IConsensus {
     }
 
     RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
-    RaftClient client = null;
+    RaftClient client;
     try {
       client = server.getDivision(raftGroupId).getRaftClient();
     } catch (IOException e) {
@@ -531,13 +557,20 @@ class RatisConsensus implements IConsensus {
       throws RatisRequestFailedException {
     // notify the group leader of configuration change
     RaftClientReply reply;
+    RatisClient client = null;
     try {
-      RatisClient client = getRaftClient(newGroupConf);
+      client = getRaftClient(newGroupConf);
       reply =
           client.getRaftClient().admin().setConfiguration(new ArrayList<>(newGroupConf.getPeers()));
-      client.returnSelf();
+      if (!reply.isSuccess()) {
+        throw new RatisRequestFailedException(reply.getException());
+      }
     } catch (IOException e) {
       throw new RatisRequestFailedException(e);
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
     }
     return reply;
   }