You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/03 10:11:44 UTC
[iotdb] branch issue_3090 created (now c3da6cc90b)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a change to branch issue_3090
in repository https://gitbox.apache.org/repos/asf/iotdb.git
at c3da6cc90b fix
This branch includes the following new commits:
new c3da6cc90b fix
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[iotdb] 01/01: fix
Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch issue_3090
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c3da6cc90b468268d07acf425ee1350a6b4fadef
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue May 3 18:11:29 2022 +0800
fix
---
.../iotdb/consensus/ratis/RatisConsensus.java | 75 ++++++++++++++++------
1 file changed, 54 insertions(+), 21 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index f8d65fa0b0..96b3b766e3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -94,7 +94,7 @@ class RatisConsensus implements IConsensus {
new IClientManager.Factory<RaftGroup, RatisClient>()
.createClientManager(new RatisClientPoolFactory());
- private Map<RaftGroupId, RaftGroup> lastSeen;
+ private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>();
private final ClientId localFakeId = ClientId.randomId();
private final AtomicLong localFakeCallId = new AtomicLong(0);
@@ -108,9 +108,6 @@ class RatisConsensus implements IConsensus {
*/
public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
throws IOException {
- lastSeen = new ConcurrentHashMap<>();
-
- // create a RaftPeer as endpoint of comm
String address = Utils.IPAddress(endpoint);
myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
@@ -174,7 +171,6 @@ class RatisConsensus implements IConsensus {
TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
}
-
NotLeaderException ex = localServerReply.getNotLeaderException();
if (ex != null) { // local server is not leader
suggestedLeader = ex.getSuggestedLeader();
@@ -185,13 +181,20 @@ class RatisConsensus implements IConsensus {
// 2. try raft client
TSStatus writeResult;
+ RatisClient client = null;
try {
- RatisClient client = getRaftClient(raftGroup);
+ client = getRaftClient(raftGroup);
RaftClientReply reply = client.getRaftClient().io().send(message);
+ if (!reply.isSuccess()) {
+ return failedWrite(new RatisRequestFailedException(reply.getException()));
+ }
writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
- client.returnSelf();
} catch (IOException | TException e) {
return failedWrite(new RatisRequestFailedException(e));
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
if (suggestedLeader != null) {
@@ -214,11 +217,12 @@ class RatisConsensus implements IConsensus {
RaftClientReply reply;
try {
RequestMessage message = new RequestMessage(IConsensusRequest);
-
RaftClientRequest clientRequest =
- buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(0));
-
+ buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(-1));
reply = server.submitClientRequest(clientRequest);
+ if (!reply.isSuccess()) {
+ return failedRead(new RatisRequestFailedException(reply.getException()));
+ }
} catch (IOException e) {
return failedRead(new RatisRequestFailedException(e));
}
@@ -247,12 +251,19 @@ class RatisConsensus implements IConsensus {
// add RaftPeer myself to this RaftGroup
RaftClientReply reply;
+ RatisClient client = null;
try {
- RatisClient client = getRaftClient(group);
+ client = getRaftClient(group);
reply = client.getRaftClient().getGroupManagementApi(myself.getId()).add(group);
- client.returnSelf();
+ if (!reply.isSuccess()) {
+ return failed(new RatisRequestFailedException(reply.getException()));
+ }
} catch (IOException e) {
return failed(new RatisRequestFailedException(e));
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
@@ -277,16 +288,23 @@ class RatisConsensus implements IConsensus {
// send remove group to myself
RaftClientReply reply;
+ RatisClient client = null;
try {
- RatisClient client = getRaftClient(raftGroup);
+ client = getRaftClient(raftGroup);
reply =
client
.getRaftClient()
.getGroupManagementApi(myself.getId())
.remove(raftGroupId, false, false);
- client.returnSelf();
+ if (!reply.isSuccess()) {
+ return failed(new RatisRequestFailedException(reply.getException()));
+ }
} catch (IOException e) {
return failed(new RatisRequestFailedException(e));
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
@@ -414,19 +432,27 @@ class RatisConsensus implements IConsensus {
}
}
- RaftClientReply reply = null;
+ RaftClientReply reply;
+ RatisClient client = null;
try {
- RatisClient client = getRaftClient(raftGroup);
+ client = getRaftClient(raftGroup);
RaftClientReply configChangeReply =
client.getRaftClient().admin().setConfiguration(newConfiguration);
if (!configChangeReply.isSuccess()) {
return failed(new RatisRequestFailedException(configChangeReply.getException()));
}
// TODO tuning for timeoutMs
- reply = client.getRaftClient().admin().transferLeadership(newRaftLeader.getId(), 2000);
- client.returnSelf();
+ reply = client.getRaftClient().admin().transferLeadership(newRaftLeader.getId(), 5000);
+ if (!reply.isSuccess()) {
+ return failed(new RatisRequestFailedException(reply.getException()));
+ }
+
} catch (IOException e) {
return failed(new RatisRequestFailedException(e));
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
}
@@ -453,7 +479,7 @@ class RatisConsensus implements IConsensus {
}
RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
- RaftClient client = null;
+ RaftClient client;
try {
client = server.getDivision(raftGroupId).getRaftClient();
} catch (IOException e) {
@@ -531,13 +557,20 @@ class RatisConsensus implements IConsensus {
throws RatisRequestFailedException {
// notify the group leader of configuration change
RaftClientReply reply;
+ RatisClient client = null;
try {
- RatisClient client = getRaftClient(newGroupConf);
+ client = getRaftClient(newGroupConf);
reply =
client.getRaftClient().admin().setConfiguration(new ArrayList<>(newGroupConf.getPeers()));
- client.returnSelf();
+ if (!reply.isSuccess()) {
+ throw new RatisRequestFailedException(reply.getException());
+ }
} catch (IOException e) {
throw new RatisRequestFailedException(e);
+ } finally {
+ if (client != null) {
+ client.returnSelf();
+ }
}
return reply;
}