You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/02 09:07:43 UTC
[iotdb] branch native_raft updated: update election
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 2a506c5570 update election
2a506c5570 is described below
commit 2a506c55708848ebd7d40ab2ded49b9222238ec9
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Mar 2 17:09:11 2023 +0800
update election
---
.../iotdb/consensus/natraft/RaftConsensus.java | 48 +++++++----
.../consensus/natraft/protocol/RaftMember.java | 20 +++--
.../protocol/heartbeat/ElectionRespHandler.java | 50 +++---------
.../natraft/protocol/heartbeat/ElectionState.java | 81 ++++++++++++++++++
.../protocol/heartbeat/HeartbeatThread.java | 95 +++++++++-------------
.../protocol/log/appender/BlockingLogAppender.java | 72 +++++++++++++---
.../natraft/protocol/log/catchup/CatchUpTask.java | 2 +-
.../protocol/log/dispatch/LogDispatcher.java | 14 +---
.../protocol/log/logtype/ConfigChangeEntry.java | 8 ++
.../manager/DirectorySnapshotRaftLogManager.java | 8 +-
.../protocol/log/manager/RaftLogManager.java | 3 +-
.../protocol/log/snapshot/DirectorySnapshot.java | 48 +++++------
.../natraft/protocol/log/snapshot/Snapshot.java | 39 +++++++++
.../natraft/service/RaftRPCServiceProcessor.java | 48 ++++++++++-
.../iotdb/consensus/natraft/utils/NodeUtils.java | 13 +++
15 files changed, 376 insertions(+), 173 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 416bf1dca3..3aad5abedc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -179,20 +179,9 @@ public class RaftConsensus implements IConsensus {
}
}
- @Override
- public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
- int consensusGroupSize = peers.size();
- if (consensusGroupSize == 0) {
- return ConsensusGenericResponse.newBuilder()
- .setException(new IllegalPeerNumException(consensusGroupSize))
- .build();
- }
- Peer thisPeer = new Peer(groupId, thisNodeId, thisNode);
- if (!peers.contains(thisPeer)) {
- return ConsensusGenericResponse.newBuilder()
- .setException(new IllegalPeerEndpointException(thisNode, peers))
- .build();
- }
+
+ public boolean createNewMemberIfAbsent(ConsensusGroupId groupId, Peer thisPeer,
+ List<Peer> peers, List<Peer> newPeers) {
AtomicBoolean exist = new AtomicBoolean(true);
stateMachineMap.computeIfAbsent(
groupId,
@@ -205,11 +194,29 @@ public class RaftConsensus implements IConsensus {
}
RaftMember impl =
new RaftMember(
- path, config, thisPeer, peers, null, groupId, registry.apply(groupId), clientManager);
+ path, config, thisPeer, peers, newPeers, groupId, registry.apply(groupId), clientManager);
impl.start();
return impl;
});
- if (exist.get()) {
+ return !exist.get();
+ }
+
+ @Override
+ public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
+ int consensusGroupSize = peers.size();
+ if (consensusGroupSize == 0) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new IllegalPeerNumException(consensusGroupSize))
+ .build();
+ }
+ Peer thisPeer = new Peer(groupId, thisNodeId, thisNode);
+ if (!peers.contains(thisPeer)) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new IllegalPeerEndpointException(thisNode, peers))
+ .build();
+ }
+
+ if (!createNewMemberIfAbsent(groupId, thisPeer, peers, null)) {
return ConsensusGenericResponse.newBuilder()
.setException(new ConsensusGroupAlreadyExistException(groupId))
.build();
@@ -297,4 +304,13 @@ public class RaftConsensus implements IConsensus {
public RaftMember getMember(ConsensusGroupId groupId) {
return stateMachineMap.get(groupId);
}
+
+
+ public int getThisNodeId() {
+ return thisNodeId;
+ }
+
+ public TEndPoint getThisNode() {
+ return thisNode;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index b6fe3de5e5..02e14a5ce0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -144,7 +144,7 @@ public class RaftMember {
/**
* the nodes that belong to the same raft group as thisNode.
*/
- protected List<Peer> allNodes;
+ protected volatile List<Peer> allNodes;
protected volatile List<Peer> newNodes;
protected ConsensusGroupId groupId;
@@ -1015,7 +1015,7 @@ public class RaftMember {
DirectorySnapshot directorySnapshot;
try {
- directorySnapshot = new DirectorySnapshot(null, null);
+ directorySnapshot = new DirectorySnapshot();
directorySnapshot.deserialize(snapshotBytes);
directorySnapshot.setSource(source);
directorySnapshot.setMemberName(name);
@@ -1140,6 +1140,11 @@ public class RaftMember {
return newNodes;
}
+ public void setNewNodes(List<Peer> newNodes) {
+ logDispatcher.setNewNodes(this.newNodes);
+ this.newNodes = newNodes;
+ }
+
public AsyncRaftServiceClient getHeartbeatClient(TEndPoint node) {
try {
return clientManager.borrowClient(node);
@@ -1158,7 +1163,7 @@ public class RaftMember {
}
}
- public TSStatus changeConfig(List<Peer> newConfig) {
+ public TSStatus changeConfig(List<Peer> newNodes) {
TSStatus tsStatus = ensureLeader(null);
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return tsStatus;
@@ -1168,11 +1173,11 @@ public class RaftMember {
VotingEntry votingEntry;
try {
logManager.getLock().writeLock().lock();
- if (newNodes != null) {
+ if (this.newNodes != null) {
return new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode()).setMessage(
"Last configuration change in progress");
}
- ConfigChangeEntry e = new ConfigChangeEntry(oldNodes, newConfig);
+ ConfigChangeEntry e = new ConfigChangeEntry(oldNodes, newNodes);
Entry lastEntry = logManager.getLastEntry();
long lastIndex = lastEntry.getCurrLogIndex();
long lastTerm = lastEntry.getCurrLogTerm();
@@ -1184,15 +1189,14 @@ public class RaftMember {
logManager.append(Collections.singletonList(e));
votingEntry = LogUtils.buildVotingLog(e, this);
- logDispatcher.setNewNodes(newNodes);
- this.newNodes = newNodes;
+ setNewNodes(newNodes);
logDispatcher.offer(votingEntry);
} finally {
logManager.getLock().writeLock().unlock();
}
- List<Peer> addedNodes = NodeUtils.computeAddedNodes(oldNodes, newNodes);
+ List<Peer> addedNodes = NodeUtils.computeAddedNodes(oldNodes, this.newNodes);
for (Peer addedNode : addedNodes) {
catchUp(addedNode, 0);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
index 52192083ef..301cb19ad4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
@@ -47,59 +47,36 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
private String memberName;
private Peer voter;
private long currTerm;
- private AtomicInteger requiredVoteNum;
- private AtomicBoolean terminated;
- // when set to true, the elector wins the election
- private AtomicBoolean electionValid;
- private AtomicInteger failingVoteCounter;
+ private ElectionState electionState;
public ElectionRespHandler(
RaftMember raftMember,
Peer voter,
long currTerm,
- AtomicInteger requiredVoteNum,
- AtomicBoolean terminated,
- AtomicBoolean electionValid,
- AtomicInteger failingVoteCounter) {
+ ElectionState electionState) {
this.raftMember = raftMember;
this.voter = voter;
this.currTerm = currTerm;
- this.requiredVoteNum = requiredVoteNum;
- this.terminated = terminated;
- this.electionValid = electionValid;
+ this.electionState = electionState;
this.memberName = raftMember.getName();
- this.failingVoteCounter = failingVoteCounter;
}
@Override
public void onComplete(Long resp) {
long voterResp = resp;
- if (terminated.get()) {
- // a voter has rejected this election, which means the term or the log id falls behind
- // this node is not able to be the leader
- logger.info(
- "{}: Terminated election received a election response {} from {}",
- memberName,
- voterResp,
- voter);
+ if (electionState.isAccepted() || electionState.isRejected()) {
return;
}
if (voterResp == RESPONSE_AGREE) {
- long remaining = requiredVoteNum.decrementAndGet();
+ electionState.onAccept(voter);
logger.info(
- "{}: Received a grant vote from {}, remaining votes to succeed: {}",
+ "{}: Received a grant vote from {}",
memberName,
- voter,
- remaining);
- if (remaining == 0) {
+ voter);
+ if (electionState.isAccepted()) {
// the election is valid
- electionValid.set(true);
- terminated.set(true);
- synchronized (terminated) {
- terminated.notifyAll();
- }
logger.info("{}: Election {} is won", memberName, currTerm);
}
// still need more votes
@@ -121,8 +98,7 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
voterResp);
raftMember.stepDown(voterResp, null);
// the election is rejected
- terminated.set(true);
- terminated.notifyAll();
+ electionState.setRejected(true);
}
}
}
@@ -141,12 +117,6 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
}
private void onFail() {
- int failingVoteRemaining = failingVoteCounter.decrementAndGet();
- if (failingVoteRemaining <= 0) {
- synchronized (terminated) {
- // wake up heartbeat thread to start the next election
- terminated.notifyAll();
- }
- }
+ electionState.onReject(voter);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java
new file mode 100644
index 0000000000..407252fe2c
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java
@@ -0,0 +1,81 @@
+package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.consensus.common.Peer;
+
+public class ElectionState {
+ private List<Peer> currNodes;
+ private List<Peer> newNodes;
+ private Set<Peer> acceptedCurrNodes;
+ private Set<Peer> acceptedNewNodes;
+ private Set<Peer> rejectedCurrNodes;
+ private Set<Peer> rejectedNewNodes;
+ private volatile boolean accepted = false;
+ private volatile boolean rejected = false;
+
+ public ElectionState(List<Peer> currNodes, List<Peer> newNodes) {
+ this.currNodes = currNodes;
+ this.newNodes = newNodes;
+ acceptedCurrNodes = new HashSet<>(currNodes.size());
+ rejectedCurrNodes = new HashSet<>(currNodes.size());
+ acceptedNewNodes = newNodes != null ? new HashSet<>(newNodes.size()) : null;
+ rejectedNewNodes = newNodes != null ? new HashSet<>(newNodes.size()) : null;
+ }
+
+ public void onAccept(Peer node) {
+ if (currNodes.contains(node)) {
+ acceptedCurrNodes.add(node);
+ }
+ if (newNodes != null && newNodes.contains(node)) {
+ acceptedNewNodes.add(node);
+ }
+ if (acceptedCurrNodes.size() >= currNodes.size() / 2 &&
+ (newNodes == null || (acceptedNewNodes.size() >= newNodes.size() / 2))) {
+ accepted = true;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+ }
+
+ public void onReject(Peer node) {
+ if (currNodes.contains(node)) {
+ rejectedCurrNodes.add(node);
+ }
+ if (newNodes != null && newNodes.contains(node)) {
+ rejectedNewNodes.add(node);
+ }
+ if (rejectedCurrNodes.size() >= currNodes.size() / 2 + 1 &&
+ (newNodes == null || (rejectedNewNodes.size() >= newNodes.size() / 2 + 1))) {
+ rejected = true;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+ }
+
+ public List<Peer> getCurrNodes() {
+ return currNodes;
+ }
+
+ public List<Peer> getNewNodes() {
+ return newNodes;
+ }
+
+ public boolean isAccepted() {
+ return accepted;
+ }
+
+ public boolean isRejected() {
+ return rejected;
+ }
+
+ public void setRejected(boolean rejected) {
+ this.rejected = rejected;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
index ef64f15e42..5939f1c577 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
@@ -19,12 +19,14 @@
package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
+import java.util.Collections;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
+import org.apache.iotdb.consensus.natraft.utils.NodeUtils;
import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
import org.apache.iotdb.consensus.raft.thrift.HeartBeatRequest;
@@ -166,7 +168,9 @@ public class HeartbeatThread implements Runnable {
logger.info("{}: End elections", memberName);
}
- /** Send each node (except the local node) in the group of the member a heartbeat. */
+ /**
+ * Send each node (except the local node) in the group of the member a heartbeat.
+ */
protected void sendHeartbeats() {
try {
localMember.getLogManager().getLock().readLock().lock();
@@ -182,7 +186,9 @@ public class HeartbeatThread implements Runnable {
sendHeartbeats(localMember.getAllNodes());
}
- /** Send each node (except the local node) in list a heartbeat. */
+ /**
+ * Send each node (except the local node) in list a heartbeat.
+ */
@SuppressWarnings("java:S2445")
private void sendHeartbeats(Collection<Peer> nodes) {
logger.debug(
@@ -266,13 +272,6 @@ public class HeartbeatThread implements Runnable {
return;
}
- int quorumNum = localMember.getAllNodes().size() / 2;
- // set to true when the election has a result (rejected or succeeded)
- AtomicBoolean electionTerminated = new AtomicBoolean(false);
- // set to true when the election is won
- AtomicBoolean electionValid = new AtomicBoolean(false);
- // a decreasing vote counter
- AtomicInteger quorum = new AtomicInteger(quorumNum);
long nextTerm;
try {
localMember.getLogManager().getLock().writeLock().lock();
@@ -287,11 +286,10 @@ public class HeartbeatThread implements Runnable {
}
// the number of votes needed to become a leader,
- // quorumNum should be equal to localMember.getAllNodes().size() / 2 + 1,
+ // currNodeQuorumNum should be equal to localMember.getAllNodes().size() / 2 + 1,
// but since it doesn’t need to vote for itself here, it directly decreases 1
- logger.info("{}: Election {} starts, quorum: {}", memberName, nextTerm, quorumNum);
- // NOTICE, failingVoteCounter should be equal to quorumNum + 1
- AtomicInteger failingVoteCounter = new AtomicInteger(quorumNum + 1);
+ logger.info("{}: Election {} starts", memberName, nextTerm);
+ // NOTICE, failingVoteCounter should be equal to currNodeQuorumNum + 1
electionRequest.setTerm(nextTerm);
electionRequest.setElector(localMember.getThisNode().getEndpoint());
@@ -300,21 +298,20 @@ public class HeartbeatThread implements Runnable {
electionRequest.setLastLogIndex(localMember.getLogManager().getLastLogIndex());
electionRequest.setGroupId(localMember.getRaftGroupId().convertToTConsensusGroupId());
+ ElectionState electionState = new ElectionState(localMember.getAllNodes(),
+ localMember.getNewNodes());
+
requestVote(
- localMember.getAllNodes(),
+ electionState,
electionRequest,
- nextTerm,
- quorum,
- electionTerminated,
- electionValid,
- failingVoteCounter);
+ nextTerm);
try {
logger.info(
"{}: Wait for {}ms until election time out", memberName, config.getElectionTimeoutMs());
- synchronized (electionTerminated) {
- electionWaitObject = electionTerminated;
- electionTerminated.wait(config.getElectionTimeoutMs());
+ synchronized (electionState) {
+ electionWaitObject = electionState;
+ electionState.wait(config.getElectionTimeoutMs());
}
electionWaitObject = null;
} catch (InterruptedException e) {
@@ -323,8 +320,7 @@ public class HeartbeatThread implements Runnable {
}
// if the election times out, the remaining votes do not matter
- electionTerminated.set(true);
- if (electionValid.get()) {
+ if (electionState.isAccepted()) {
logger.info("{}: Election {} accepted", memberName, nextTerm);
localMember.getStatus().setRole(RaftRole.LEADER);
localMember.getStatus().getLeader().set(localMember.getThisNode());
@@ -332,44 +328,31 @@ public class HeartbeatThread implements Runnable {
}
/**
- * Request a vote from each of the "nodes". Each for vote will decrease the counter "quorum" and
- * when it reaches 0, the flag "electionValid" and "electionTerminated" will be set to true. Any
- * against vote will set the flag "electionTerminated" to true and ends the election.
- *
- * @param nodes
- * @param request
- * @param nextTerm the term of the election
- * @param quorum
- * @param electionTerminated
- * @param electionValid
+ * Request a vote from each of the "currNodes". Each for vote will decrease the counter "quorum"
+ * and when it reaches 0, the flag "electionValid" and "electionTerminated" will be set to true.
+ * Any against vote will set the flag "electionTerminated" to true and ends the election.
*/
@SuppressWarnings("java:S2445")
private void requestVote(
- Collection<Peer> nodes,
+ ElectionState electionState,
ElectionRequest request,
- long nextTerm,
- AtomicInteger quorum,
- AtomicBoolean electionTerminated,
- AtomicBoolean electionValid,
- AtomicInteger failingVoteCounter) {
- synchronized (nodes) {
- // avoid concurrent modification
- for (Peer node : nodes) {
- if (node.equals(localMember.getThisNode())) {
- continue;
- }
+ long nextTerm) {
- ElectionRespHandler handler =
- new ElectionRespHandler(
- localMember,
- node,
- nextTerm,
- quorum,
- electionTerminated,
- electionValid,
- failingVoteCounter);
- requestVoteAsync(node, handler, request);
+ Collection<Peer> peers = NodeUtils.unionNodes(electionState.getCurrNodes(),
+ electionState.getNewNodes());
+ // avoid concurrent modification
+ for (Peer node : peers) {
+ if (node.equals(localMember.getThisNode())) {
+ continue;
}
+
+ ElectionRespHandler handler =
+ new ElectionRespHandler(
+ localMember,
+ node,
+ nextTerm,
+ electionState);
+ requestVoteAsync(node, handler, request);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index 6c5e54d1d6..d8b3ae16f3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.natraft.protocol.log.appender;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
import org.apache.iotdb.consensus.natraft.utils.Response;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
@@ -60,7 +61,7 @@ public class BlockingLogAppender implements LogAppender {
* and append "log" to it. Otherwise report a log mismatch.
*
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
public AppendEntryResult appendEntry(AppendEntryRequest request, Entry log) {
long resp = checkPrevLogIndex(request.prevLogIndex);
@@ -106,7 +107,9 @@ public class BlockingLogAppender implements LogAppender {
return result;
}
- /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
+ /**
+ * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+ */
private boolean waitForPrevLog(long prevLogIndex) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
@@ -151,7 +154,7 @@ public class BlockingLogAppender implements LogAppender {
*
* @param logs append logs
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> logs) {
logger.debug(
@@ -171,18 +174,21 @@ public class BlockingLogAppender implements LogAppender {
.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
}
+ ConfigChangeEntry lastConfigEntry = null;
+ for (Entry log : logs) {
+ if (log instanceof ConfigChangeEntry) {
+ lastConfigEntry = ((ConfigChangeEntry) log);
+ }
+ }
+
AppendEntryResult result = new AppendEntryResult();
long startWaitingTime = System.currentTimeMillis();
while (true) {
- synchronized (logManager) {
- // TODO: Consider memory footprint to execute a precise rejection
- if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
- <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
- resp =
- logManager.maybeAppend(
- request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
- break;
- }
+ if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
+ <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
+ resp = lastConfigEntry == null ? appendWithoutConfigChange(request, logs, result)
+ : appendWithConfigChange(request, logs, result, lastConfigEntry);
+ break;
}
try {
@@ -197,6 +203,46 @@ public class BlockingLogAppender implements LogAppender {
}
}
+ return result;
+ }
+
+ protected long appendWithConfigChange(AppendEntriesRequest request, List<Entry> logs,
+ AppendEntryResult result, ConfigChangeEntry configChangeEntry) {
+ long resp;
+ try {
+ logManager.getLock().writeLock().lock();
+ resp =
+ logManager.maybeAppend(
+ request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+
+ if (resp != -1) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{} append a new log list {}, commit to {}",
+ member.getName(),
+ logs,
+ request.leaderCommit);
+ }
+
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogTerm(logManager.getLastLogTerm());
+ member.setNewNodes(configChangeEntry.getNewPeers());
+ } else {
+ // the incoming log points to an illegal position, reject it
+ result.status = Response.RESPONSE_LOG_MISMATCH;
+ }
+ } finally {
+ logManager.getLock().writeLock().unlock();
+ }
+ return resp;
+ }
+
+ protected long appendWithoutConfigChange(AppendEntriesRequest request, List<Entry> logs,
+ AppendEntryResult result) {
+ long resp =
+ logManager.maybeAppend(
+ request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
if (resp != -1) {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -213,7 +259,7 @@ public class BlockingLogAppender implements LogAppender {
// the incoming log points to an illegal position, reject it
result.status = Response.RESPONSE_LOG_MISMATCH;
}
- return result;
+ return resp;
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
index e7ae9e2dca..15ecbca8b9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
@@ -299,7 +299,7 @@ public class CatchUpTask implements Runnable {
}
private void doSnapshot() {
- raftMember.getLogManager().takeSnapshot();
+ raftMember.getLogManager().takeSnapshot(raftMember);
snapshot = raftMember.getLogManager().getSnapshot(peerInfo.getMatchIndex());
if (logger.isInfoEnabled()) {
logger.info(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index f7f5c7ba10..44939e9f61 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
+import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
+
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@@ -89,7 +91,7 @@ public class LogDispatcher {
}
this.allNodes = member.getAllNodes();
this.newNodes = member.getNewNodes();
- createQueueAndBindingThreads(unionNodes());
+ createQueueAndBindingThreads(unionNodes(allNodes, newNodes));
}
public void updateRateLimiter() {
@@ -99,15 +101,7 @@ public class LogDispatcher {
}
}
- private Collection<Peer> unionNodes() {
- if (newNodes == null) {
- return allNodes;
- }
- Set<Peer> nodeUnion = new HashSet<>();
- nodeUnion.addAll(allNodes);
- nodeUnion.addAll(newNodes);
- return nodeUnion;
- }
+
void createQueue(Peer node) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
index 8f63f3142f..6cecfa1b9e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -77,4 +77,12 @@ public class ConfigChangeEntry extends Entry {
newPeers.add(Peer.deserialize(buffer));
}
}
+
+ public List<Peer> getNewPeers() {
+ return newPeers;
+ }
+
+ public List<Peer> getOldPeers() {
+ return oldPeers;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index d6a032d32a..ddf771a41b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -20,7 +20,9 @@
package org.apache.iotdb.consensus.natraft.protocol.log.manager;
import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
@@ -53,7 +55,7 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
}
@Override
- public void takeSnapshot() {
+ public void takeSnapshot(RaftMember member) {
latestSnapshotDir =
new File(
config.getStorageDir()
@@ -61,17 +63,19 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
+ getName()
+ "-snapshot-"
+ System.currentTimeMillis());
+ List<Peer> currNodes;
try {
lock.readLock().lock();
snapshotIndex = getAppliedIndex();
snapshotTerm = getAppliedTerm();
+ currNodes = member.getAllNodes();
} finally {
lock.readLock().unlock();
}
stateMachine.takeSnapshot(latestSnapshotDir);
List<Path> snapshotFiles = stateMachine.getSnapshotFiles(latestSnapshotDir);
snapshotFiles.addAll(IOUtils.collectPaths(latestSnapshotDir));
- directorySnapshot = new DirectorySnapshot(latestSnapshotDir, snapshotFiles);
+ directorySnapshot = new DirectorySnapshot(latestSnapshotDir, snapshotFiles, currNodes);
directorySnapshot.setLastLogIndex(snapshotIndex);
directorySnapshot.setLastLogTerm(snapshotTerm);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 682bec242a..6c1be21199 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.natraft.exception.LogExecutionException;
import org.apache.iotdb.consensus.natraft.protocol.HardState;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
import org.apache.iotdb.consensus.natraft.protocol.log.logtype.EmptyEntry;
@@ -199,7 +200,7 @@ public abstract class RaftLogManager {
*
* @throws IOException timeout exception
*/
- public abstract void takeSnapshot();
+ public abstract void takeSnapshot(RaftMember member);
/**
* Update the raftNode's hardState(currentTerm,voteFor) and flush to disk.
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index d1fd917684..359bb1393b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -4,13 +4,16 @@
package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
+import java.io.DataOutputStream;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,43 +37,40 @@ public class DirectorySnapshot extends Snapshot {
private TEndPoint source;
private String memberName;
- public DirectorySnapshot(File directory, List<Path> filePaths) {
+ public DirectorySnapshot() {
+
+ }
+
+ public DirectorySnapshot(File directory, List<Path> filePaths, List<Peer> peers) {
this.directory = directory;
this.filePaths = filePaths;
+ this.currNodes = peers;
}
@Override
public ByteBuffer serialize() {
- byte[] rootBytes = directory.getAbsolutePath().getBytes(StandardCharsets.UTF_8);
- int bufferSize = Long.BYTES * 2 + rootBytes.length + Integer.BYTES * 2;
- byte[][] filePathBytes = new byte[filePaths.size()][];
- for (int i = 0; i < filePaths.size(); i++) {
- Path path = filePaths.get(i);
- byte[] bytes = path.toString().getBytes(StandardCharsets.UTF_8);
- filePathBytes[i] = bytes;
- bufferSize += Integer.BYTES;
- bufferSize += bytes.length;
- }
+ PublicBAOS baos = new PublicBAOS();
+ DataOutputStream dataOutputStream = new DataOutputStream(baos);
- ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
- buffer.putLong(lastLogIndex);
- buffer.putLong(lastLogTerm);
- buffer.putInt(rootBytes.length);
- buffer.put(rootBytes);
- buffer.putInt(filePaths.size());
- for (byte[] relativeFileByte : filePathBytes) {
- buffer.putInt(relativeFileByte.length);
- buffer.put(relativeFileByte);
+ serializeBase(dataOutputStream);
+
+ try {
+ dataOutputStream.writeBytes(directory.getAbsolutePath());
+ dataOutputStream.writeInt(filePaths.size());
+ for (Path filePath : filePaths) {
+ dataOutputStream.writeBytes(filePath.toString());
+ }
+ } catch (IOException e) {
+ // unreachable
}
- buffer.flip();
- return buffer;
+ return ByteBuffer.wrap(baos.getBuf(), 0, baos.size());
}
@Override
public void deserialize(ByteBuffer buffer) {
- lastLogIndex = buffer.getLong();
- lastLogTerm = buffer.getLong();
+ deserializeBase(buffer);
+
int size = buffer.getInt();
byte[] bytes = new byte[size];
buffer.get(bytes);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
index 8414ad5483..b5aa2ae455 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
@@ -19,7 +19,12 @@
package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import java.nio.ByteBuffer;
@@ -35,6 +40,7 @@ public abstract class Snapshot {
protected long lastLogIndex;
protected long lastLogTerm;
+ protected List<Peer> currNodes;
public abstract ByteBuffer serialize();
@@ -71,4 +77,37 @@ public abstract class Snapshot {
public String toString() {
return String.format("%d-%d", lastLogIndex, lastLogTerm);
}
+
+ public List<Peer> getCurrNodes() {
+ return currNodes;
+ }
+
+ public void setCurrNodes(List<Peer> currNodes) {
+ this.currNodes = currNodes;
+ }
+
+ protected void serializeBase(DataOutputStream dataOutputStream) {
+ try {
+ dataOutputStream.writeLong(lastLogIndex);
+ dataOutputStream.writeLong(lastLogTerm);
+
+ dataOutputStream.writeInt(currNodes.size());
+ for (Peer currNode : currNodes) {
+ currNode.serialize(dataOutputStream);
+ }
+ } catch (IOException e) {
+ // unreachable
+ }
+ }
+
+ protected void deserializeBase(ByteBuffer buffer) {
+ lastLogIndex = buffer.getLong();
+ lastLogTerm = buffer.getLong();
+
+ int size = buffer.getInt();
+ currNodes = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ currNodes.add(Peer.deserialize(buffer));
+ }
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index cefecbf9a2..9b22f4cc00 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -21,11 +21,16 @@ package org.apache.iotdb.consensus.natraft.service;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.natraft.RaftConsensus;
import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
+import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
import org.apache.iotdb.consensus.natraft.utils.IOUtils;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
@@ -56,7 +61,8 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
this.consensus = consensus;
}
- public void handleClientExit() {}
+ public void handleClientExit() {
+ }
private RaftMember getMember(TConsensusGroupId groupId) throws TException {
RaftMember member = consensus.getMember(Factory.createFromTConsensusGroupId(groupId));
@@ -66,6 +72,44 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
return member;
}
+ private ConfigChangeEntry findFirstConfigChangeEntry(AppendEntriesRequest request)
+ throws UnknownLogTypeException {
+ ConfigChangeEntry configChangeEntry = null;
+ for (ByteBuffer entryBuffer : request.entries) {
+ Entry entry = LogParser.getINSTANCE().parse(entryBuffer);
+ if (entry instanceof ConfigChangeEntry) {
+ configChangeEntry = (ConfigChangeEntry) entry;
+ break;
+ }
+ }
+ return configChangeEntry;
+ }
+
+ /**
+ * Get the associated member or create it using the last config change entry in the request (if
+ * any).
+ */
+ private RaftMember getMemberOrCreate(TConsensusGroupId tgroupId, AppendEntriesRequest request)
+ throws TException {
+ ConsensusGroupId groupId = Factory.createFromTConsensusGroupId(tgroupId);
+ RaftMember member = consensus.getMember(groupId);
+ if (member == null) {
+ try {
+ ConfigChangeEntry lastConfigChangeEntry = findFirstConfigChangeEntry(request);
+ if (lastConfigChangeEntry != null) {
+ Peer thisPeer = new Peer(groupId, consensus.getThisNodeId(), consensus.getThisNode());
+ consensus.createNewMemberIfAbsent(groupId, thisPeer, lastConfigChangeEntry.getOldPeers(),
+ lastConfigChangeEntry.getNewPeers());
+ return consensus.getMember(groupId);
+ }
+ } catch (UnknownLogTypeException e) {
+ throw new TException(e.getMessage());
+ }
+
+ }
+ throw new NoMemberException("No such member of: " + tgroupId);
+ }
+
@Override
public void sendHeartbeat(
HeartBeatRequest request, AsyncMethodCallback<HeartBeatResponse> resultHandler)
@@ -89,7 +133,7 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
public void appendEntries(
AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler)
throws TException {
- RaftMember member = getMember(request.groupId);
+ RaftMember member = getMemberOrCreate(request.groupId, request);
try {
resultHandler.onComplete(member.appendEntries(request));
} catch (UnknownLogTypeException e) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
index 229a9bc81e..94a30dd51e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
@@ -1,7 +1,10 @@
package org.apache.iotdb.consensus.natraft.utils;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.iotdb.consensus.common.Peer;
public class NodeUtils {
@@ -15,4 +18,14 @@ public class NodeUtils {
}
return addedNode;
}
+
+ public static Collection<Peer> unionNodes(List<Peer> currNodes, List<Peer> newNodes) {
+ if (newNodes == null) {
+ return currNodes;
+ }
+ Set<Peer> nodeUnion = new HashSet<>();
+ nodeUnion.addAll(currNodes);
+ nodeUnion.addAll(newNodes);
+ return nodeUnion;
+ }
}