You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/06 03:25:18 UTC
[iotdb] branch native_raft updated: implement membership change funcitons
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 0381fb3f68 implement membership change funcitons
0381fb3f68 is described below
commit 0381fb3f68b5430703c0ec58d8db65edb9c5880a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Mar 6 11:26:46 2023 +0800
implement membership change funcitons
---
.../iotdb/consensus/natraft/RaftConsensus.java | 59 +++++++--
.../natraft/client/SyncClientAdaptor.java | 8 ++
.../consensus/natraft/protocol/RaftMember.java | 136 +++++++++++++++++++--
.../natraft/protocol/log/applier/BaseApplier.java | 10 +-
.../log/dispatch/AppendNodeEntryHandler.java | 4 -
.../protocol/log/dispatch/LogDispatcher.java | 25 +++-
.../manager/DirectorySnapshotRaftLogManager.java | 7 +-
.../protocol/log/manager/RaftLogManager.java | 10 +-
.../serialization/SyncLogDequeSerializer.java | 10 --
.../natraft/service/RaftRPCServiceProcessor.java | 7 ++
.../iotdb/consensus/natraft/utils/NodeUtils.java | 1 +
.../iotdb/consensus/natraft/utils/StatusUtils.java | 18 +++
thrift-raft/src/main/thrift/raft.thrift | 22 ++--
13 files changed, 267 insertions(+), 50 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 3aad5abedc..8c3231d21c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.consensus.natraft.service.RaftRPCService;
import org.apache.iotdb.consensus.natraft.service.RaftRPCServiceProcessor;
+import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -129,7 +130,8 @@ public class RaftConsensus implements IConsensus {
null,
consensusGroupId,
registry.apply(consensusGroupId),
- clientManager);
+ clientManager,
+ this::onMemberRemoved);
stateMachineMap.put(consensusGroupId, raftMember);
raftMember.start();
}
@@ -179,6 +181,9 @@ public class RaftConsensus implements IConsensus {
}
}
+ public void onMemberRemoved(ConsensusGroupId groupId) {
+ stateMachineMap.remove(groupId);
+ }
public boolean createNewMemberIfAbsent(ConsensusGroupId groupId, Peer thisPeer,
List<Peer> peers, List<Peer> newPeers) {
@@ -194,7 +199,8 @@ public class RaftConsensus implements IConsensus {
}
RaftMember impl =
new RaftMember(
- path, config, thisPeer, peers, newPeers, groupId, registry.apply(groupId), clientManager);
+ path, config, thisPeer, peers, newPeers, groupId, registry.apply(groupId),
+ clientManager, this::onMemberRemoved);
impl.start();
return impl;
});
@@ -250,32 +256,69 @@ public class RaftConsensus implements IConsensus {
@Override
public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
- return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ RaftMember impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return StatusUtils.toGenericResponse(impl.addPeer(peer));
}
@Override
public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
- return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ RaftMember impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return StatusUtils.toGenericResponse(impl.removePeer(peer));
}
@Override
public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer oldPeer, Peer newPeer) {
- return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
+ RaftMember impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return StatusUtils.toGenericResponse(impl.updatePeer(oldPeer, newPeer));
}
@Override
public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
- return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ RaftMember impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return StatusUtils.toGenericResponse(impl.changeConfig(newPeers));
}
@Override
public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
- return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ RaftMember impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return StatusUtils.toGenericResponse(impl.transferLeader(newLeader));
}
@Override
public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
- return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ RaftMember impl = stateMachineMap.get(groupId);
+ if (impl == null) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ impl.triggerSnapshot();
+ return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
index fef5e19c26..a5be4ae46a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
@@ -83,6 +83,14 @@ public class SyncClientAdaptor {
return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
}
+ public static TSStatus forceElection(
+ AsyncRaftServiceClient client, ConsensusGroupId groupId)
+ throws TException, InterruptedException {
+ GenericHandler<TSStatus> matchTermHandler = new GenericHandler<>(client.getEndpoint());
+ client.forceElection(groupId.convertToTConsensusGroupId(), matchTermHandler);
+ return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
+ }
+
public static ByteBuffer readFile(
AsyncRaftServiceClient client, String remotePath, long offset, int fetchSize)
throws InterruptedException, TException {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 02e14a5ce0..9ac0f1ed86 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -23,12 +23,14 @@
package org.apache.iotdb.consensus.natraft.protocol;
import java.util.Collections;
+import java.util.function.Consumer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
@@ -193,6 +195,7 @@ public class RaftMember {
protected LogSequencer logSequencer;
private volatile LogAppender logAppender;
private FlowBalancer flowBalancer;
+ private Consumer<ConsensusGroupId> onRemove;
public RaftMember(
String storageDir,
@@ -202,7 +205,8 @@ public class RaftMember {
List<Peer> newNodes,
ConsensusGroupId groupId,
IStateMachine stateMachine,
- IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager) {
+ IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager,
+ Consumer<ConsensusGroupId> onRemove) {
this.config = config;
this.storageDir = storageDir;
initConfig();
@@ -226,23 +230,67 @@ public class RaftMember {
this.clientManager = clientManager;
this.stateMachine = stateMachine;
- this.logManager =
- new DirectorySnapshotRaftLogManager(
- new SyncLogDequeSerializer(groupId, config),
- new AsyncLogApplier(new BaseApplier(stateMachine), name, config),
- name,
- stateMachine,
- config);
+
this.votingLogList = new VotingLogList(this);
this.logAppender = appenderFactory.create(this, config);
this.logSequencer = SEQUENCER_FACTORY.create(this, config);
this.logDispatcher = new LogDispatcher(this, config);
this.heartbeatReqHandler = new HeartbeatReqHandler(this);
this.electionReqHandler = new ElectionReqHandler(this);
+ this.logManager =
+ new DirectorySnapshotRaftLogManager(
+ new SyncLogDequeSerializer(groupId, config),
+ new AsyncLogApplier(new BaseApplier(stateMachine, this), name, config),
+ name,
+ stateMachine,
+ config,
+ this::examineUnappliedEntry);
+ this.onRemove = onRemove;
initPeerMap();
}
+ public void applyConfigChange(ConfigChangeEntry configChangeEntry) {
+ List<Peer> newNodes = configChangeEntry.getNewPeers();
+ if (!newNodes.equals(this.newNodes)) {
+ return;
+ }
+
+ if (newNodes.contains(thisNode)) {
+ applyNewNodes();
+ }
+ }
+
+ public void applyNewNodes() {
+ try {
+ logManager.getLock().writeLock().lock();
+ logDispatcher.applyNewNodes();
+ allNodes = newNodes;
+ newNodes = null;
+ persistConfiguration();
+ } finally {
+ logManager.getLock().writeLock().unlock();
+ }
+ }
+
+ public void remove() {
+ stop();
+ FileUtils.deleteDirectory(new File(storageDir));
+ onRemove.accept(groupId);
+ }
+
+ private void examineUnappliedEntry(List<Entry> entries) {
+ ConfigChangeEntry configChangeEntry = null;
+ for (Entry entry : entries) {
+ if (entry instanceof ConfigChangeEntry) {
+ configChangeEntry = (ConfigChangeEntry) entry;
+ }
+ }
+ if (configChangeEntry != null) {
+ setNewNodes(configChangeEntry.getNewPeers());
+ }
+ }
+
public void recoverConfiguration() {
ByteBuffer buffer;
try {
@@ -1239,4 +1287,76 @@ public class RaftMember {
newPeers.add(newPeer);
return changeConfig(newPeers);
}
+
+ public TSStatus removePeer(Peer toRemove) {
+ List<Peer> allNodes = getAllNodes();
+ if (!allNodes.contains(toRemove)) {
+ return StatusUtils.OK.deepCopy().setMessage("Peer already removed");
+ }
+
+ List<Peer> newPeers = new ArrayList<>(allNodes);
+ newPeers.remove(toRemove);
+ return changeConfig(newPeers);
+ }
+
+ public TSStatus updatePeer(Peer oldPeer, Peer newPeer) {
+ List<Peer> allNodes = getAllNodes();
+ if (!allNodes.contains(oldPeer)) {
+ return StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy().setMessage("Peer already removed");
+ }
+ if (allNodes.contains(newPeer)) {
+ return StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy().setMessage("Peer already exists");
+ }
+
+ List<Peer> newPeers = new ArrayList<>(allNodes);
+ newPeers.remove(oldPeer);
+ newPeers.add(newPeer);
+ return changeConfig(newPeers);
+ }
+
+ public void triggerSnapshot() {
+ logManager.takeSnapshot(this);
+ }
+
+ public TSStatus transferLeader(Peer peer) {
+ if (thisNode.equals(peer)) {
+ return StatusUtils.OK;
+ }
+ if (!isLeader()) {
+ return StatusUtils.NO_LEADER.deepCopy().setMessage("This node is not a leader");
+ }
+ if (!allNodes.contains(peer)) {
+ return StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy().setMessage("Peer not in this group");
+ }
+
+ AsyncRaftServiceClient client = getClient(peer.getEndpoint());
+ try {
+ return SyncClientAdaptor.forceElection(client, groupId);
+ } catch (TException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public TSStatus forceElection() {
+ if (isLeader()) {
+ return StatusUtils.OK;
+ }
+
+ heartbeatThread.setLastHeartbeatReceivedTime(0);
+ heartbeatThread.notifyHeartbeat();
+ long waitStart = System.currentTimeMillis();
+ long maxWait = 10_000L;
+ while (!isLeader() && (System.currentTimeMillis() - waitStart) < maxWait) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ return StatusUtils.TIME_OUT;
+ }
+ }
+ if (isLeader()) {
+ return StatusUtils.OK;
+ } else {
+ return StatusUtils.TIME_OUT;
+ }
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
index 6832400798..646159877f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
@@ -24,17 +24,21 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.logtype.RequestEntry;
import org.apache.iotdb.rpc.TSStatusCode;
/** BaseApplier use PlanExecutor to execute PhysicalPlans. */
public class BaseApplier implements LogApplier {
- IStateMachine stateMachine;
+ protected IStateMachine stateMachine;
+ protected RaftMember member;
- public BaseApplier(IStateMachine stateMachine) {
+ public BaseApplier(IStateMachine stateMachine, RaftMember member) {
this.stateMachine = stateMachine;
+ this.member = member;
}
@TestOnly
@@ -53,6 +57,8 @@ public class BaseApplier implements LogApplier {
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
e.setException(new ConsensusException(status.message + ":" + status.code));
}
+ } else if (e instanceof ConfigChangeEntry) {
+ member.applyConfigChange(((ConfigChangeEntry) e));
}
} catch (Exception ex) {
e.setException(ex);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 2b5d92d390..7b5f9f5dc2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -50,7 +50,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
protected RaftMember member;
protected VotingEntry log;
protected Peer directReceiver;
- protected int quorumSize;
public AppendNodeEntryHandler() {}
@@ -150,7 +149,4 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
this.directReceiver = follower;
}
- public void setQuorumSize(int quorumSize) {
- this.quorumSize = quorumSize;
- }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 44939e9f61..07f3bc3929 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -187,6 +187,26 @@ public class LogDispatcher {
return new DispatcherThread(node, logBlockingQueue);
}
+ public void applyNewNodes() {
+ allNodes = newNodes;
+ newNodes = null;
+
+ List<Peer> nodesToRemove = new ArrayList<>();
+ for (Entry<Peer, ExecutorService> entry : executorServices.entrySet()) {
+ if (!allNodes.contains(entry.getKey())) {
+ nodesToRemove.add(entry.getKey());
+ }
+ }
+ for (Peer peer : nodesToRemove) {
+ ExecutorService executorService = executorServices.remove(peer);
+ executorService.shutdownNow();
+ nodesRate.remove(peer);
+ nodesRateLimiter.remove(peer);
+ nodesEnabled.remove(peer);
+ nodesLogQueuesMap.remove(peer);
+ }
+ }
+
protected class DispatcherThread implements Runnable {
Peer receiver;
@@ -312,12 +332,11 @@ public class LogDispatcher {
}
public AppendNodeEntryHandler getAppendNodeEntryHandler(
- VotingEntry log, Peer node, int quorumSize) {
+ VotingEntry log, Peer node) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setDirectReceiver(node);
handler.setLog(log);
handler.setMember(member);
- handler.setQuorumSize(quorumSize);
return handler;
}
@@ -329,7 +348,7 @@ public class LogDispatcher {
singleEntryHandlers = new ArrayList<>(batch.size());
for (VotingEntry sendLogRequest : batch) {
AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(sendLogRequest, receiver, sendLogRequest.getQuorumSize());
+ getAppendNodeEntryHandler(sendLogRequest, receiver);
singleEntryHandlers.add(handler);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index ddf771a41b..befe8ef655 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.consensus.natraft.protocol.log.manager;
+import java.util.function.Consumer;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
@@ -45,8 +47,9 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
LogApplier applier,
String name,
IStateMachine stateMachine,
- RaftConfig config) {
- super(stableEntryManager, applier, name, stateMachine, config);
+ RaftConfig config,
+ Consumer<List<Entry>> unappliedEntryExaminer) {
+ super(stableEntryManager, applier, name, stateMachine, config, unappliedEntryExaminer);
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 6c1be21199..3fdcf963d2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log.manager;
+import java.util.function.Consumer;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.consensus.IStateMachine;
@@ -111,7 +112,8 @@ public abstract class RaftLogManager {
LogApplier applier,
String name,
IStateMachine stateMachine,
- RaftConfig config) {
+ RaftConfig config,
+ Consumer<List<Entry>> unappliedEntryExaminer) {
this.logApplier = applier;
this.name = name;
this.stateMachine = stateMachine;
@@ -119,7 +121,7 @@ public abstract class RaftLogManager {
this.config = config;
initConf();
- initEntries();
+ initEntries(unappliedEntryExaminer);
this.blockedUnappliedLogList = new CopyOnWriteArrayList<>();
@@ -154,9 +156,11 @@ public abstract class RaftLogManager {
}
}
- private void initEntries() {
+ private void initEntries(Consumer<List<Entry>> unappliedEntryExaminer) {
LogManagerMeta meta = stableEntryManager.getMeta();
List<Entry> allEntriesAfterAppliedIndex = stableEntryManager.getAllEntriesAfterAppliedIndex();
+ unappliedEntryExaminer.accept(allEntriesAfterAppliedIndex);
+
entries = new ArrayList<>();
if (!allEntriesAfterAppliedIndex.isEmpty()) {
entries.addAll(allEntriesAfterAppliedIndex);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index c8dc1d2933..4df1d4faf2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -206,16 +206,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return systemDir + File.separator + groupId + File.separator + "raftLog" + File.separator;
}
- @TestOnly
- String getLogDir() {
- return logDir;
- }
-
- @TestOnly
- File getMetaFile() {
- return metaFile;
- }
-
/** for log tools */
@Override
public LogManagerMeta getMeta() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index 9b22f4cc00..08793449fd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -179,6 +179,13 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
resultHandler.onComplete(member.requestCommitIndex());
}
+ @Override
+ public void forceElection(TConsensusGroupId groupId, AsyncMethodCallback<TSStatus> resultHandler)
+ throws TException {
+ RaftMember member = getMember(groupId);
+ resultHandler.onComplete(member.forceElection());
+ }
+
@Override
public void readFile(
String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
index 94a30dd51e..dc1ec45817 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
@@ -28,4 +28,5 @@ public class NodeUtils {
nodeUnion.addAll(newNodes);
return nodeUnion;
}
+
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
index d7f0323627..ba01f7983f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
@@ -21,7 +21,12 @@ package org.apache.iotdb.consensus.natraft.utils;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse.Builder;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient.Factory;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.checkerframework.checker.units.qual.C;
public class StatusUtils {
@@ -90,4 +95,17 @@ public class StatusUtils {
newStatus.setRedirectNode(redirectedNode);
return newStatus;
}
+
+ public static ConsensusGenericResponse toGenericResponse(TSStatus status) {
+ Builder builder = ConsensusGenericResponse.newBuilder();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ builder.setSuccess(true);
+ } else {
+ builder.setSuccess(false);
+ builder.setException(
+ new ConsensusException(String.format("%d:%s", status.getCode(), status.getMessage())));
+ }
+
+ return ConsensusGenericResponse.newBuilder().build();
+ }
}
diff --git a/thrift-raft/src/main/thrift/raft.thrift b/thrift-raft/src/main/thrift/raft.thrift
index a518a239e0..e2e6fa7e80 100644
--- a/thrift-raft/src/main/thrift/raft.thrift
+++ b/thrift-raft/src/main/thrift/raft.thrift
@@ -166,14 +166,16 @@ service RaftService {
void ping()
/**
- * Ask the leader for its commit index, used to check whether the node has caught up with the
- * leader.
- **/
- RequestCommitIndexResponse requestCommitIndex(1:common.TConsensusGroupId groupId)
-
- /**
- * Read a chunk of a file from the client. If the remaining of the file does not have enough
- * bytes, only the remaining will be returned.
- **/
- binary readFile(1:string filePath, 2:i64 offset, 3:i32 length)
+ * Ask the leader for its commit index, used to check whether the node has caught up with the
+ * leader.
+ **/
+ RequestCommitIndexResponse requestCommitIndex(1:common.TConsensusGroupId groupId)
+
+ /**
+ * Read a chunk of a file from the client. If the remaining of the file does not have enough
+ * bytes, only the remaining will be returned.
+ **/
+ binary readFile(1:string filePath, 2:i64 offset, 3:i32 length)
+
+ common.TSStatus forceElection(1:common.TConsensusGroupId groupId)
}
\ No newline at end of file