You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/04 03:21:38 UTC
[iotdb] branch master updated: fix (#5784)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2b943e4cd3 fix (#5784)
2b943e4cd3 is described below
commit 2b943e4cd3c3f65d1ee557660a399fc0885d3c97
Author: Potato <TX...@gmail.com>
AuthorDate: Wed May 4 11:21:34 2022 +0800
fix (#5784)
---
.../iotdb/consensus/ratis/RatisConsensus.java | 75 ++++++++++++++++------
1 file changed, 54 insertions(+), 21 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index f8d65fa0b0..96b3b766e3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -94,7 +94,7 @@ class RatisConsensus implements IConsensus {
new IClientManager.Factory<RaftGroup, RatisClient>()
.createClientManager(new RatisClientPoolFactory());
- private Map<RaftGroupId, RaftGroup> lastSeen;
+ private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>();
private final ClientId localFakeId = ClientId.randomId();
private final AtomicLong localFakeCallId = new AtomicLong(0);
@@ -108,9 +108,6 @@ class RatisConsensus implements IConsensus {
*/
public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
throws IOException {
- lastSeen = new ConcurrentHashMap<>();
-
- // create a RaftPeer as endpoint of comm
String address = Utils.IPAddress(endpoint);
myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
@@ -174,7 +171,6 @@ class RatisConsensus implements IConsensus {
TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
}
-
NotLeaderException ex = localServerReply.getNotLeaderException();
if (ex != null) { // local server is not leader
suggestedLeader = ex.getSuggestedLeader();
@@ -185,13 +181,20 @@ class RatisConsensus implements IConsensus {
// 2. try raft client
TSStatus writeResult;
+ RatisClient client = null;
try {
- RatisClient client = getRaftClient(raftGroup);
+ client = getRaftClient(raftGroup);
RaftClientReply reply = client.getRaftClient().io().send(message);
+ if (!reply.isSuccess()) {
+ return failedWrite(new RatisRequestFailedException(reply.getException()));
+ }
writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
- client.returnSelf();
} catch (IOException | TException e) {
return failedWrite(new RatisRequestFailedException(e));
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
if (suggestedLeader != null) {
@@ -214,11 +217,12 @@ class RatisConsensus implements IConsensus {
RaftClientReply reply;
try {
RequestMessage message = new RequestMessage(IConsensusRequest);
-
RaftClientRequest clientRequest =
- buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(0));
-
+ buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(-1));
reply = server.submitClientRequest(clientRequest);
+ if (!reply.isSuccess()) {
+ return failedRead(new RatisRequestFailedException(reply.getException()));
+ }
} catch (IOException e) {
return failedRead(new RatisRequestFailedException(e));
}
@@ -247,12 +251,19 @@ class RatisConsensus implements IConsensus {
// add RaftPeer myself to this RaftGroup
RaftClientReply reply;
+ RatisClient client = null;
try {
- RatisClient client = getRaftClient(group);
+ client = getRaftClient(group);
reply = client.getRaftClient().getGroupManagementApi(myself.getId()).add(group);
- client.returnSelf();
+ if (!reply.isSuccess()) {
+ return failed(new RatisRequestFailedException(reply.getException()));
+ }
} catch (IOException e) {
return failed(new RatisRequestFailedException(e));
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
@@ -277,16 +288,23 @@ class RatisConsensus implements IConsensus {
// send remove group to myself
RaftClientReply reply;
+ RatisClient client = null;
try {
- RatisClient client = getRaftClient(raftGroup);
+ client = getRaftClient(raftGroup);
reply =
client
.getRaftClient()
.getGroupManagementApi(myself.getId())
.remove(raftGroupId, false, false);
- client.returnSelf();
+ if (!reply.isSuccess()) {
+ return failed(new RatisRequestFailedException(reply.getException()));
+ }
} catch (IOException e) {
return failed(new RatisRequestFailedException(e));
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
@@ -414,19 +432,27 @@ class RatisConsensus implements IConsensus {
}
}
- RaftClientReply reply = null;
+ RaftClientReply reply;
+ RatisClient client = null;
try {
- RatisClient client = getRaftClient(raftGroup);
+ client = getRaftClient(raftGroup);
RaftClientReply configChangeReply =
client.getRaftClient().admin().setConfiguration(newConfiguration);
if (!configChangeReply.isSuccess()) {
return failed(new RatisRequestFailedException(configChangeReply.getException()));
}
// TODO tuning for timeoutMs
- reply = client.getRaftClient().admin().transferLeadership(newRaftLeader.getId(), 2000);
- client.returnSelf();
+ reply = client.getRaftClient().admin().transferLeadership(newRaftLeader.getId(), 5000);
+ if (!reply.isSuccess()) {
+ return failed(new RatisRequestFailedException(reply.getException()));
+ }
+
} catch (IOException e) {
return failed(new RatisRequestFailedException(e));
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
}
@@ -453,7 +479,7 @@ class RatisConsensus implements IConsensus {
}
RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
- RaftClient client = null;
+ RaftClient client;
try {
client = server.getDivision(raftGroupId).getRaftClient();
} catch (IOException e) {
@@ -531,13 +557,20 @@ class RatisConsensus implements IConsensus {
throws RatisRequestFailedException {
// notify the group leader of configuration change
RaftClientReply reply;
+ RatisClient client = null;
try {
- RatisClient client = getRaftClient(newGroupConf);
+ client = getRaftClient(newGroupConf);
reply =
client.getRaftClient().admin().setConfiguration(new ArrayList<>(newGroupConf.getPeers()));
- client.returnSelf();
+ if (!reply.isSuccess()) {
+ throw new RatisRequestFailedException(reply.getException());
+ }
} catch (IOException e) {
throw new RatisRequestFailedException(e);
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
return reply;
}