You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/01 03:09:50 UTC
[iotdb] branch native_raft updated: temp save
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 96bb581305 temp save
96bb581305 is described below
commit 96bb58130552866561de859c24e75ed05350636b
Author: Tian Jiang <jt...@163.com>
AuthorDate: Wed Mar 1 11:11:18 2023 +0800
temp save
---
.../iotdb/consensus/natraft/RaftConsensus.java | 3 +-
.../consensus/natraft/protocol/RaftMember.java | 325 +++++++++++++--------
.../consensus/natraft/protocol/log/Entry.java | 3 +-
.../natraft/protocol/log/VotingEntry.java | 183 ++++++++++++
.../consensus/natraft/protocol/log/VotingLog.java | 119 --------
.../log/dispatch/AppendNodeEntryHandler.java | 28 +-
.../protocol/log/dispatch/LogDispatcher.java | 118 +++++---
.../protocol/log/dispatch/VotingLogList.java | 106 ++++---
.../log/dispatch/flowcontrol/FlowBalancer.java | 4 +-
.../protocol/log/logtype/ConfigChangeEntry.java | 80 +++++
.../protocol/log/sequencing/LogSequencer.java | 7 +-
.../log/sequencing/LogSequencerFactory.java | 2 +-
.../log/sequencing/SynchronousSequencer.java | 89 +-----
.../iotdb/consensus/natraft/utils/LogUtils.java | 81 +++++
.../iotdb/consensus/natraft/utils/NodeUtils.java | 18 ++
15 files changed, 727 insertions(+), 439 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 03ff338c79..416bf1dca3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -126,6 +126,7 @@ public class RaftConsensus implements IConsensus {
config,
new Peer(consensusGroupId, thisNodeId, thisNode),
new ArrayList<>(),
+ null,
consensusGroupId,
registry.apply(consensusGroupId),
clientManager);
@@ -204,7 +205,7 @@ public class RaftConsensus implements IConsensus {
}
RaftMember impl =
new RaftMember(
- path, config, thisPeer, peers, groupId, registry.apply(groupId), clientManager);
+ path, config, thisPeer, peers, null, groupId, registry.apply(groupId), clientManager);
impl.start();
return impl;
});
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index e0019b89c5..b6fe3de5e5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -1,27 +1,28 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-
-
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+
+
+ */
package org.apache.iotdb.consensus.natraft.protocol;
+import java.util.Collections;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
@@ -47,7 +48,7 @@ import org.apache.iotdb.consensus.natraft.protocol.heartbeat.HeartbeatReqHandler
import org.apache.iotdb.consensus.natraft.protocol.heartbeat.HeartbeatThread;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.appender.BlockingLogAppender;
import org.apache.iotdb.consensus.natraft.protocol.log.appender.LogAppender;
import org.apache.iotdb.consensus.natraft.protocol.log.appender.LogAppenderFactory;
@@ -56,8 +57,10 @@ import org.apache.iotdb.consensus.natraft.protocol.log.applier.BaseApplier;
import org.apache.iotdb.consensus.natraft.protocol.log.catchup.CatchUpManager;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.LogDispatcher;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.VotingLogList;
+import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.VotingLogList.AcceptedType;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowBalancer;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.logtype.RequestEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.CommitLogCallback;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.CommitLogTask;
@@ -69,6 +72,8 @@ import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.LogSequencerFa
import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.SynchronousSequencer;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
import org.apache.iotdb.consensus.natraft.utils.IOUtils;
+import org.apache.iotdb.consensus.natraft.utils.LogUtils;
+import org.apache.iotdb.consensus.natraft.utils.NodeUtils;
import org.apache.iotdb.consensus.natraft.utils.Response;
import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
@@ -108,7 +113,6 @@ public class RaftMember {
private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
private RaftConfig config;
- private boolean enableWeakAcceptance;
protected static final LogAppenderFactory appenderFactory = new BlockingLogAppender.Factory();
protected static final LogSequencerFactory SEQUENCER_FACTORY = new SynchronousSequencer.Factory();
@@ -119,7 +123,6 @@ public class RaftMember {
private static final String MSG_NO_LEADER_COMMIT_INDEX =
"{}: Cannot request commit index from {}";
private static final String MSG_NO_LEADER_IN_SYNC = "{}: No leader is found when synchronizing";
- public static final String MSG_LOG_IS_ACCEPTED = "{}: log {} is accepted";
/**
* when the leader of this node changes, the condition will be notified so other threads that wait
@@ -127,17 +130,22 @@ public class RaftMember {
*/
private final Object waitLeaderCondition = new Object();
- /** the lock is to make sure that only one thread can apply snapshot at the same time */
+ /**
+ * the lock is to make sure that only one thread can apply snapshot at the same time
+ */
private final Lock snapshotApplyLock = new ReentrantLock();
/**
* when the commit progress is updated by a heartbeat, this object is notified so that we may know
* if this node is up-to-date with the leader, and whether the given consistency is reached
*/
- private Object syncLock = new Object();
+ private final Object syncLock = new Object();
protected Peer thisNode;
- /** the nodes that belong to the same raft group as thisNode. */
+ /**
+ * the nodes that belong to the same raft group as thisNode.
+ */
protected List<Peer> allNodes;
+ protected volatile List<Peer> newNodes;
protected ConsensusGroupId groupId;
protected String name;
@@ -145,7 +153,9 @@ public class RaftMember {
protected RaftStatus status = new RaftStatus();
- /** the raft logs are all stored and maintained in the log manager */
+ /**
+ * the raft logs are all stored and maintained in the log manager
+ */
protected RaftLogManager logManager;
protected HeartbeatThread heartbeatThread;
@@ -157,11 +167,6 @@ public class RaftMember {
* candidates for weak consistency reads and provide snapshots for the new data holders
*/
volatile boolean readOnly = false;
- /**
- * lastLogIndex when generating the previous member report, to show the log ingestion rate of the
- * member by comparing it with the current last log index.
- */
- long lastReportedLogIndex;
/**
* client manager that provides reusable Thrift clients to connect to other RaftMembers and
@@ -170,12 +175,15 @@ public class RaftMember {
protected IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager;
protected CatchUpManager catchUpManager;
- /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
+ /**
+ * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
+ */
private ExecutorService commitLogPool;
/**
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
- * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
+ * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
+ * logs.
*/
private volatile LogDispatcher logDispatcher;
@@ -191,6 +199,7 @@ public class RaftMember {
RaftConfig config,
Peer thisNode,
List<Peer> allNodes,
+ List<Peer> newNodes,
ConsensusGroupId groupId,
IStateMachine stateMachine,
IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager) {
@@ -205,6 +214,7 @@ public class RaftMember {
} else {
persistConfiguration();
}
+ this.newNodes = newNodes;
this.groupId = groupId;
this.name =
@@ -223,9 +233,9 @@ public class RaftMember {
name,
stateMachine,
config);
- this.votingLogList = new VotingLogList(allNodes.size() / 2, this);
+ this.votingLogList = new VotingLogList(this);
this.logAppender = appenderFactory.create(this, config);
- this.logSequencer = SEQUENCER_FACTORY.create(this, logManager, config);
+ this.logSequencer = SEQUENCER_FACTORY.create(this, config);
this.logDispatcher = new LogDispatcher(this, config);
this.heartbeatReqHandler = new HeartbeatReqHandler(this);
this.electionReqHandler = new ElectionReqHandler(this);
@@ -305,7 +315,7 @@ public class RaftMember {
}
private void initConfig() {
- this.enableWeakAcceptance = config.isEnableWeakAcceptance();
+ votingLogList.setEnableWeakAcceptance(config.isEnableWeakAcceptance());
}
public void initPeerMap() {
@@ -451,7 +461,9 @@ public class RaftMember {
return appendEntriesInternal(request);
}
- /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
+ /**
+ * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
+ */
private AppendEntryResult appendEntriesInternal(AppendEntriesRequest request)
throws UnknownLogTypeException {
logger.debug("{} received an AppendEntriesRequest", name);
@@ -563,11 +575,7 @@ public class RaftMember {
return Objects.equals(status.leader.get(), thisNode);
}
- public TSStatus processRequest(IConsensusRequest request) {
- if (readOnly) {
- return StatusUtils.NODE_READ_ONLY;
- }
-
+ private TSStatus ensureLeader(IConsensusRequest request) {
if (getLeader() == null) {
waitLeader();
}
@@ -576,10 +584,25 @@ public class RaftMember {
Peer leader = getLeader();
if (leader == null) {
return StatusUtils.NO_LEADER;
- } else {
+ } else if (request != null) {
return forwardRequest(request, leader.getEndpoint(), leader.getGroupId());
+ } else {
+ return new TSStatus().setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
+ .setRedirectNode(leader.getEndpoint());
}
}
+ return StatusUtils.OK;
+ }
+
+ public TSStatus processRequest(IConsensusRequest request) {
+ if (readOnly) {
+ return StatusUtils.NODE_READ_ONLY;
+ }
+
+ TSStatus tsStatus = ensureLeader(request);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return tsStatus;
+ }
logger.debug("{}: Processing request {}", name, request);
Entry entry = new RequestEntry(request);
@@ -593,37 +616,16 @@ public class RaftMember {
}
// assign term and index to the new log and append it
- VotingLog sendLogRequest = logSequencer.sequence(entry);
+ VotingEntry votingEntry = logSequencer.sequence(entry);
if (config.isUseFollowerLoadBalance()) {
FlowMonitorManager.INSTANCE.report(thisNode, entry.estimateSize());
}
- if (sendLogRequest == null) {
+ if (votingEntry == null) {
return StatusUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
}
- try {
- AppendLogResult appendLogResult =
- waitAppendResult(sendLogRequest, sendLogRequest.getQuorumSize());
- switch (appendLogResult) {
- case WEAK_ACCEPT:
- return includeLogNumbersInStatus(
- StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), entry);
- case OK:
- waitApply(entry);
- return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), entry);
- case TIME_OUT:
- logger.debug("{}: log {} timed out...", name, entry);
- break;
- case LEADERSHIP_STALE:
- // abort the appending, the new leader will fix the local logs by catch-up
- default:
- break;
- }
- } catch (LogExecutionException e) {
- return handleLogExecutionException(entry, IOUtils.getRootCause(e));
- }
- return StatusUtils.getStatus(TSStatusCode.TIME_OUT);
+ return waitForEntryResult(votingEntry);
}
protected void waitApply(Entry entry) throws LogExecutionException {
@@ -650,23 +652,16 @@ public class RaftMember {
getRaftGroupId() + "-" + entry.getCurrLogIndex() + "-" + entry.getCurrLogTerm());
}
- protected AppendLogResult waitAppendResult(VotingLog log, int quorumSize) {
+ protected AppendLogResult waitAppendResult(VotingEntry votingEntry) {
// wait for the followers to vote
- int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
- int weaklyAccepted = log.getWeaklyAcceptedNodes().size();
- int stronglyAccepted = totalAccepted - weaklyAccepted;
-
- if (log.getEntry().getCurrLogIndex() == Long.MIN_VALUE
- || ((stronglyAccepted < quorumSize
- || (!enableWeakAcceptance || (totalAccepted < quorumSize)) && !log.isHasFailed()))) {
- waitAppendResultLoop(log, quorumSize);
+
+ AcceptedType acceptedType = votingLogList.computeAcceptedType(votingEntry);
+ if (votingLogList.computeAcceptedType(votingEntry) == AcceptedType.NOT_ACCEPTED) {
+ acceptedType = waitAppendResultLoop(votingEntry);
}
- totalAccepted = votingLogList.totalAcceptedNodeNum(log);
- weaklyAccepted = log.getWeaklyAcceptedNodes().size();
- stronglyAccepted = totalAccepted - weaklyAccepted;
// a node has a larger status.term than the local node, so this node is no longer a valid leader
- if (status.term.get() != log.getEntry().getCurrLogTerm()) {
+ if (status.term.get() != votingEntry.getEntry().getCurrLogTerm()) {
return AppendLogResult.LEADERSHIP_STALE;
}
// the node knows it is no longer the leader from other requests
@@ -674,18 +669,17 @@ public class RaftMember {
return AppendLogResult.LEADERSHIP_STALE;
}
- if (totalAccepted >= quorumSize && stronglyAccepted < quorumSize) {
+ if (acceptedType == AcceptedType.WEAKLY_ACCEPTED) {
return AppendLogResult.WEAK_ACCEPT;
}
- // cannot get enough agreements within a certain amount of time
- if (totalAccepted < quorumSize) {
- logger.info("{} failed because {} < {}", log, totalAccepted, quorumSize);
- return AppendLogResult.TIME_OUT;
+ if (acceptedType == AcceptedType.STRONGLY_ACCEPTED) {
+ return AppendLogResult.OK;
}
- // voteCounter has counted down to zero
- return AppendLogResult.OK;
+ // cannot get enough agreements within a certain amount of time
+ logger.info("{} failed", votingEntry);
+ return AppendLogResult.TIME_OUT;
}
protected TSStatus handleLogExecutionException(Object log, Throwable cause) {
@@ -705,23 +699,22 @@ public class RaftMember {
* one follower tells the node that it is no longer a valid leader, or a timeout is triggered.
*/
@SuppressWarnings({"java:S2445"}) // safe synchronized
- private void waitAppendResultLoop(VotingLog log, int quorumSize) {
- int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
- int weaklyAccepted = log.getWeaklyAcceptedNodes().size();
- int stronglyAccepted = totalAccepted - weaklyAccepted;
- long nextTimeToPrint = 5000;
+ private AcceptedType waitAppendResultLoop(VotingEntry log) {
+ long nextTimeToPrint = 5000;
long waitStart = System.nanoTime();
long alreadyWait = 0;
String threadBaseName = Thread.currentThread().getName();
+ if (logger.isDebugEnabled()) {
+ Thread.currentThread()
+ .setName(threadBaseName + "-waiting-" + log.getEntry().getCurrLogIndex());
+ }
long waitTime = 1;
+ AcceptedType acceptedType = votingLogList.computeAcceptedType(log);
synchronized (log.getEntry()) {
- while (log.getEntry().getCurrLogIndex() == Long.MIN_VALUE
- || (stronglyAccepted < quorumSize
- && (!(enableWeakAcceptance && (totalAccepted >= quorumSize))
- && alreadyWait < config.getWriteOperationTimeoutMS()
- && !log.isHasFailed()))) {
+ while (votingLogList.computeAcceptedType(log) == AcceptedType.NOT_ACCEPTED
+ && alreadyWait < config.getWriteOperationTimeoutMS()) {
try {
log.getEntry().wait(waitTime);
} catch (InterruptedException e) {
@@ -739,9 +732,6 @@ public class RaftMember {
alreadyWait);
nextTimeToPrint *= 2;
}
- totalAccepted = votingLogList.totalAcceptedNodeNum(log);
- weaklyAccepted = log.getWeaklyAcceptedNodes().size();
- stronglyAccepted = totalAccepted - weaklyAccepted;
}
}
if (logger.isDebugEnabled()) {
@@ -755,6 +745,7 @@ public class RaftMember {
log.getWeaklyAcceptedNodes(),
alreadyWait);
}
+ return acceptedType;
}
public ConsensusWriteResponse executeForwardedRequest(IConsensusRequest request) {
@@ -790,32 +781,31 @@ public class RaftMember {
}
}
- public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
+ public void syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
if (status.role == RaftRole.LEADER) {
- return true;
+ return;
}
waitLeader();
if (status.leader.get() == null || status.leader.get() == null) {
// the leader has not been elected, we must assume the node falls behind
logger.warn(MSG_NO_LEADER_IN_SYNC, name);
- return false;
+ return;
}
if (status.role == RaftRole.LEADER) {
- return true;
+ return;
}
logger.debug("{}: try synchronizing with the leader {}", name, status.leader.get());
- return waitUntilCatchUp(checkConsistency);
+ waitUntilCatchUp(checkConsistency);
}
/**
* Request the leader's commit index and wait until the local commit index becomes not less than
* it.
*
- * @return true if this node has caught up before timeout, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
- protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
+ protected void waitUntilCatchUp(CheckConsistency checkConsistency)
throws CheckConsistencyException {
long leaderCommitId = Long.MIN_VALUE;
RequestCommitIndexResponse response;
@@ -826,7 +816,7 @@ public class RaftMember {
tryUpdateCommitIndex(
response.getTerm(), response.getCommitLogIndex(), response.getCommitLogTerm());
- return syncLocalApply(leaderCommitId, true);
+ syncLocalApply(leaderCommitId, true);
} catch (TException e) {
logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, status.leader.get(), e);
} catch (InterruptedException e) {
@@ -838,17 +828,15 @@ public class RaftMember {
}
}
logger.debug("Start to sync with leader, leader commit id is {}", leaderCommitId);
- return false;
}
/**
* sync local applyId to leader commitId
*
* @param leaderCommitId leader commit id
- * @param fastFail if enabled, when log differ too much, return false directly.
- * @return true if leaderCommitId <= localAppliedId
+ * @param fastFail if enabled, when log differ too much, return false directly.
*/
- public boolean syncLocalApply(long leaderCommitId, boolean fastFail) {
+ public void syncLocalApply(long leaderCommitId, boolean fastFail) {
long startTime = System.currentTimeMillis();
long waitedTime = 0;
long localAppliedId;
@@ -856,7 +844,7 @@ public class RaftMember {
if (fastFail && leaderCommitId - logManager.getAppliedIndex() > config.getMaxSyncLogLag()) {
logger.info(
"{}: The raft log of this member is too backward to provide service directly.", name);
- return false;
+ return;
}
while (waitedTime < config.getSyncLeaderMaxWaitMs()) {
@@ -870,7 +858,7 @@ public class RaftMember {
logger.debug(
"{}: synchronized to target index {} after {}ms", name, leaderCommitId, waitedTime);
}
- return true;
+ return;
}
// wait for next heartbeat to catch up
// the local node will not perform a commit here according to the leaderCommitId because
@@ -889,10 +877,11 @@ public class RaftMember {
name,
leaderCommitId,
waitedTime);
- return false;
}
- /** Wait until the leader of this node becomes known or time out. */
+ /**
+ * Wait until the leader of this node becomes known or time out.
+ */
public void waitLeader() {
long startTime = System.currentTimeMillis();
while (status.leader.get() == null || status.leader.get() == null) {
@@ -929,7 +918,9 @@ public class RaftMember {
return handler.getResult(config.getConnectionTimeoutInMS());
}
- /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
+ /**
+ * @return true if there is a log whose index is "index" and term is "term", false otherwise
+ */
public boolean matchLog(long index, long term) {
boolean matched = logManager.matchTerm(term, index);
logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -939,14 +930,14 @@ public class RaftMember {
/**
* Forward a non-query plan to a node using the default client.
*
- * @param plan a non-query plan
- * @param node cannot be the local node
+ * @param plan a non-query plan
+ * @param node cannot be the local node
* @param groupId must be set for data group communication, set to null for meta group
- * communication
+ * communication
* @return a TSStatus indicating if the forwarding is successful.
*/
public TSStatus forwardRequest(IConsensusRequest plan, TEndPoint node, ConsensusGroupId groupId) {
- if (node == null || node.equals(thisNode)) {
+ if (node == null || node.equals(thisNode.getEndpoint())) {
logger.debug("{}: plan {} has no where to be forwarded", name, plan);
return StatusUtils.NO_LEADER;
}
@@ -957,7 +948,7 @@ public class RaftMember {
if (status.getCode() == TSStatusCode.TIME_OUT.getStatusCode()
&& (groupId == null || groupId.equals(getRaftGroupId()))
&& (this.status.leader.get() != null)
- && this.status.leader.get().equals(node)) {
+ && this.status.leader.get().getEndpoint().equals(node)) {
// leader is down, trigger a new election by resetting heartbeat
heartbeatThread.setLastHeartbeatReceivedTime(-1);
this.status.leader.set(null);
@@ -969,7 +960,7 @@ public class RaftMember {
/**
* Forward a non-query plan to "receiver" using "client".
*
- * @param plan a non-query plan
+ * @param plan a non-query plan
* @param groupId to determine which DataGroupMember of "receiver" will process the request.
* @return a TSStatus indicating if the forwarding is successful.
*/
@@ -1105,8 +1096,8 @@ public class RaftMember {
return logDispatcher;
}
- public VotingLog buildVotingLog(Entry e) {
- return new VotingLog(e, allNodes.size(), null, allNodes.size() / 2, config);
+ public VotingEntry buildVotingLog(Entry e) {
+ return new VotingEntry(e, null, allNodes, newNodes, config);
}
public HeartbeatThread getHeartbeatThread() {
@@ -1145,6 +1136,10 @@ public class RaftMember {
return allNodes;
}
+ public List<Peer> getNewNodes() {
+ return newNodes;
+ }
+
public AsyncRaftServiceClient getHeartbeatClient(TEndPoint node) {
try {
return clientManager.borrowClient(node);
@@ -1162,4 +1157,82 @@ public class RaftMember {
return null;
}
}
+
+ public TSStatus changeConfig(List<Peer> newConfig) {
+ TSStatus tsStatus = ensureLeader(null);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return tsStatus;
+ }
+
+ List<Peer> oldNodes = new ArrayList<>(allNodes);
+ VotingEntry votingEntry;
+ try {
+ logManager.getLock().writeLock().lock();
+ if (newNodes != null) {
+ return new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode()).setMessage(
+ "Last configuration change in progress");
+ }
+ ConfigChangeEntry e = new ConfigChangeEntry(oldNodes, newConfig);
+ Entry lastEntry = logManager.getLastEntry();
+ long lastIndex = lastEntry.getCurrLogIndex();
+ long lastTerm = lastEntry.getCurrLogTerm();
+
+ e.setCurrLogTerm(getStatus().getTerm().get());
+ e.setCurrLogIndex(lastIndex + 1);
+ e.setPrevTerm(lastTerm);
+
+ logManager.append(Collections.singletonList(e));
+ votingEntry = LogUtils.buildVotingLog(e, this);
+
+ logDispatcher.setNewNodes(newNodes);
+ this.newNodes = newNodes;
+
+ logDispatcher.offer(votingEntry);
+ } finally {
+ logManager.getLock().writeLock().unlock();
+ }
+
+ List<Peer> addedNodes = NodeUtils.computeAddedNodes(oldNodes, newNodes);
+ for (Peer addedNode : addedNodes) {
+ catchUp(addedNode, 0);
+ }
+
+ return waitForEntryResult(votingEntry);
+ }
+
+ private TSStatus waitForEntryResult(VotingEntry votingEntry) {
+ try {
+ AppendLogResult appendLogResult =
+ waitAppendResult(votingEntry);
+ switch (appendLogResult) {
+ case WEAK_ACCEPT:
+ return includeLogNumbersInStatus(
+ StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), votingEntry.getEntry());
+ case OK:
+ waitApply(votingEntry.getEntry());
+ return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), votingEntry.getEntry());
+ case TIME_OUT:
+ logger.debug("{}: log {} timed out...", name, votingEntry.getEntry());
+ break;
+ case LEADERSHIP_STALE:
+ // abort the appending, the new leader will fix the local logs by catch-up
+ default:
+ break;
+ }
+ } catch (LogExecutionException e) {
+ return handleLogExecutionException(votingEntry.getEntry(), IOUtils.getRootCause(e));
+ }
+ return StatusUtils.getStatus(TSStatusCode.TIME_OUT);
+ }
+
+ public TSStatus addPeer(Peer newPeer) {
+ List<Peer> allNodes = getAllNodes();
+ if (allNodes.contains(newPeer)) {
+ return StatusUtils.OK.deepCopy().setMessage("Peer already exists");
+ }
+
+ List<Peer> newPeers = new ArrayList<>(allNodes);
+ newPeers.add(newPeer);
+ return changeConfig(newPeers);
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 1a357441d5..1748518078 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -61,7 +61,8 @@ public abstract class Entry implements Comparable<Entry> {
public enum Types {
// DO CHECK LogParser when you add a new type of log
CLIENT_REQUEST,
- EMPTY
+ EMPTY,
+ CONFIG_CHANGE
}
public long getCurrLogIndex() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
new file mode 100644
index 0000000000..30be376f74
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.protocol.log;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+public class VotingEntry {
+
+ protected Entry entry;
+ // for NB-Raft
+ protected Set<Peer> weaklyAcceptedNodes;
+ private boolean hasFailed;
+ private AppendEntryRequest appendEntryRequest;
+ private Future<ByteBuffer> serializedLogFuture;
+ protected List<Peer> currNodes;
+ protected List<Peer> newNodes;
+ private boolean isStronglyAccepted;
+
+ public VotingEntry(
+ Entry entry,
+ AppendEntryRequest appendEntryRequest,
+ List<Peer> currNodes,
+ List<Peer> newNodes,
+ RaftConfig config) {
+ this.entry = entry;
+ if (config.isUseFollowerSlidingWindow()) {
+ weaklyAcceptedNodes = new HashSet<>(
+ currNodes.size() + (newNodes != null ? newNodes.size() : 0));
+ }
+ this.setAppendEntryRequest(appendEntryRequest);
+ this.currNodes = currNodes;
+ this.newNodes = newNodes;
+ }
+
+ public VotingEntry(VotingEntry another) {
+ this.entry = another.entry;
+ this.weaklyAcceptedNodes = another.weaklyAcceptedNodes;
+ this.setAppendEntryRequest(another.appendEntryRequest);
+ this.setSerializedLogFuture(another.getSerializedLogFuture());
+ this.currNodes = another.currNodes;
+ this.newNodes = another.newNodes;
+ }
+
+ public Entry getEntry() {
+ return entry;
+ }
+
+ public void setEntry(Entry entry) {
+ this.entry = entry;
+ }
+
+ public Set<Peer> getWeaklyAcceptedNodes() {
+ return weaklyAcceptedNodes != null ? weaklyAcceptedNodes : Collections.emptySet();
+ }
+
+ public void addWeaklyAcceptedNodes(Peer node) {
+ weaklyAcceptedNodes.add(node);
+ }
+
+ @Override
+ public String toString() {
+ return entry.toString();
+ }
+
+ public AppendEntryRequest getAppendEntryRequest() {
+ return appendEntryRequest;
+ }
+
+ public void setAppendEntryRequest(AppendEntryRequest appendEntryRequest) {
+ this.appendEntryRequest = appendEntryRequest;
+ }
+
+ public Future<ByteBuffer> getSerializedLogFuture() {
+ return serializedLogFuture;
+ }
+
+ public void setSerializedLogFuture(Future<ByteBuffer> serializedLogFuture) {
+ this.serializedLogFuture = serializedLogFuture;
+ }
+
+ public int currNodesQuorumNum() {
+ return currNodes.size() / 2 + 1;
+ }
+
+ public int newNodesQuorumNum() {
+ return newNodes != null ? newNodes.size() / 2 + 1 : 0;
+ }
+
+ public boolean isStronglyAccepted(Map<Peer, Long> stronglyAcceptedIndices) {
+ if (isStronglyAccepted) {
+ return true;
+ }
+ int currNodeQuorumNum = currNodesQuorumNum();
+ int newNodeQuorumNum = newNodesQuorumNum();
+ boolean stronglyAcceptedByCurrNodes =
+ stronglyAcceptedNumByCurrNodes(stronglyAcceptedIndices) >= currNodeQuorumNum;
+ boolean stronglyAcceptedByNewNodes =
+ stronglyAcceptedNumByNewNodes(stronglyAcceptedIndices) >= newNodeQuorumNum;
+ if (stronglyAcceptedByCurrNodes && stronglyAcceptedByNewNodes) {
+ isStronglyAccepted = true;
+ }
+ return stronglyAcceptedByCurrNodes && stronglyAcceptedByNewNodes;
+ }
+
+ public int stronglyAcceptedNumByCurrNodes(Map<Peer, Long> stronglyAcceptedIndices) {
+ int num = 0;
+ for (Peer node : currNodes) {
+ if (stronglyAcceptedIndices.getOrDefault(node, -1L) >= entry.getCurrLogIndex()) {
+ num++;
+ }
+ }
+ return num;
+ }
+
+ public int stronglyAcceptedNumByNewNodes(Map<Peer, Long> stronglyAcceptedIndices) {
+ if (!hasNewNodes()) {
+ return 0;
+ }
+ int num = 0;
+ for (Peer node : newNodes) {
+ if (stronglyAcceptedIndices.getOrDefault(node, -1L) >= entry.getCurrLogIndex()) {
+ num++;
+ }
+ }
+ return num;
+ }
+
+ public int weaklyAcceptedNumByCurrNodes(Map<Peer, Long> stronglyAcceptedIndices) {
+ int num = 0;
+ for (Peer node : currNodes) {
+ if (weaklyAcceptedNodes.contains(node)
+ && stronglyAcceptedIndices.getOrDefault(node, -1L) < entry.getCurrLogIndex()) {
+ num++;
+ }
+ }
+ return num;
+ }
+
+ public int weaklyAcceptedNumByNewNodes(Map<Peer, Long> stronglyAcceptedIndices) {
+ if (!hasNewNodes()) {
+ return 0;
+ }
+ int num = 0;
+ for (Peer node : currNodes) {
+ if (weaklyAcceptedNodes.contains(node)
+ && stronglyAcceptedIndices.getOrDefault(node, -1L) < entry.getCurrLogIndex()) {
+ num++;
+ }
+ }
+ return num;
+ }
+
+ public boolean hasNewNodes() {
+ return newNodes != null;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingLog.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingLog.java
deleted file mode 100644
index 45022cd328..0000000000
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingLog.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.natraft.protocol.log;
-
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Future;
-
-public class VotingLog {
-
- protected Entry entry;
- // for NB-Raft
- protected Set<Peer> weaklyAcceptedNodes;
- protected Set<Peer> failedNodes;
- private boolean hasFailed;
- private AppendEntryRequest appendEntryRequest;
- private Future<ByteBuffer> serializedLogFuture;
- private int quorumSize;
-
- public VotingLog(
- Entry entry,
- int groupSize,
- AppendEntryRequest appendEntryRequest,
- int quorumSize,
- RaftConfig config) {
- this.entry = entry;
- failedNodes = new HashSet<>(groupSize);
- if (config.isUseFollowerSlidingWindow()) {
- weaklyAcceptedNodes = new HashSet<>(groupSize);
- }
- this.setAppendEntryRequest(appendEntryRequest);
- this.setQuorumSize(quorumSize);
- }
-
- public VotingLog(VotingLog another) {
- this.entry = another.entry;
- this.weaklyAcceptedNodes = another.weaklyAcceptedNodes;
- this.failedNodes = another.failedNodes;
- this.setAppendEntryRequest(another.appendEntryRequest);
- this.setQuorumSize(another.quorumSize);
- this.setSerializedLogFuture(another.getSerializedLogFuture());
- }
-
- public Entry getEntry() {
- return entry;
- }
-
- public void setEntry(Entry entry) {
- this.entry = entry;
- }
-
- public Set<Peer> getWeaklyAcceptedNodes() {
- return weaklyAcceptedNodes != null ? weaklyAcceptedNodes : Collections.emptySet();
- }
-
- @Override
- public String toString() {
- return entry.toString();
- }
-
- public Set<Peer> getFailedNodes() {
- return failedNodes;
- }
-
- public boolean isHasFailed() {
- return hasFailed;
- }
-
- public void setHasFailed(boolean hasFailed) {
- this.hasFailed = hasFailed;
- }
-
- public AppendEntryRequest getAppendEntryRequest() {
- return appendEntryRequest;
- }
-
- public void setAppendEntryRequest(AppendEntryRequest appendEntryRequest) {
- this.appendEntryRequest = appendEntryRequest;
- }
-
- public Future<ByteBuffer> getSerializedLogFuture() {
- return serializedLogFuture;
- }
-
- public void setSerializedLogFuture(Future<ByteBuffer> serializedLogFuture) {
- this.serializedLogFuture = serializedLogFuture;
- }
-
- public int getQuorumSize() {
- return quorumSize;
- }
-
- public void setQuorumSize(int quorumSize) {
- this.quorumSize = quorumSize;
- }
-}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index d00f70c43d..2b5d92d390 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.apache.thrift.TApplicationException;
@@ -48,7 +48,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
private static final Logger logger = LoggerFactory.getLogger(AppendNodeEntryHandler.class);
protected RaftMember member;
- protected VotingLog log;
+ protected VotingEntry log;
protected Peer directReceiver;
protected int quorumSize;
@@ -56,10 +56,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
@Override
public void onComplete(AppendEntryResult response) {
- if (log.isHasFailed()) {
- return;
- }
-
Peer trueReceiver =
response.isSetReceiver()
? new Peer(
@@ -77,7 +73,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
member
.getVotingLogList()
.onStronglyAccept(
- log.getEntry().getCurrLogIndex(), log.getEntry().getCurrLogTerm(), trueReceiver);
+ log, trueReceiver);
member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex);
} else if (resp > 0) {
@@ -95,7 +91,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
}
} else if (resp == RESPONSE_WEAK_ACCEPT) {
synchronized (log) {
- log.getWeaklyAcceptedNodes().add(trueReceiver);
+ log.addWeaklyAcceptedNodes(trueReceiver);
log.notifyAll();
}
} else {
@@ -114,7 +110,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
log,
trueReceiver,
resp);
- onFail(trueReceiver);
}
}
// rejected because the receiver's logs are stale or the receiver has no cluster info, just
@@ -140,23 +135,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
logger.warn(
"{}: Cannot append log {} to {}", member.getName(), log, directReceiver, exception);
}
- onFail(directReceiver);
}
- private void onFail(Peer trueReceiver) {
- synchronized (log.getEntry()) {
- log.getFailedNodes().add(trueReceiver);
- if (log.getFailedNodes().size() > quorumSize) {
- // quorum members have failed, there is no need to wait for others
- logger.warn(
- "{} failed because too many replicas have failed: {}", log, log.getFailedNodes());
- log.setHasFailed(true);
- log.getEntry().notifyAll();
- }
- }
- }
- public void setLog(VotingLog log) {
+ public void setLog(VotingEntry log) {
this.log = log;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 6b3787db73..f7f5c7ba10 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -27,7 +30,7 @@ import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
@@ -62,7 +65,9 @@ public class LogDispatcher {
private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
protected RaftMember member;
private RaftConfig config;
- protected Map<Peer, BlockingQueue<VotingLog>> nodesLogQueuesMap = new HashMap<>();
+ protected List<Peer> allNodes;
+ protected List<Peer> newNodes;
+ protected Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = new HashMap<>();
protected Map<Peer, Boolean> nodesEnabled;
protected Map<Peer, RateLimiter> nodesRateLimiter = new HashMap<>();
protected Map<Peer, Double> nodesRate = new HashMap<>();
@@ -82,7 +87,9 @@ public class LogDispatcher {
if (!queueOrdered) {
maxBatchSize = 1;
}
- createQueueAndBindingThreads();
+ this.allNodes = member.getAllNodes();
+ this.newNodes = member.getNewNodes();
+ createQueueAndBindingThreads(unionNodes());
}
public void updateRateLimiter() {
@@ -92,33 +99,51 @@ public class LogDispatcher {
}
}
- void createQueueAndBindingThreads() {
- for (Peer node : member.getAllNodes()) {
- if (!node.equals(member.getThisNode())) {
- BlockingQueue<VotingLog> logBlockingQueue;
- logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
- nodesLogQueuesMap.put(node, logBlockingQueue);
- nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
- }
+ private Collection<Peer> unionNodes() {
+ if (newNodes == null) {
+ return allNodes;
}
- updateRateLimiter();
+ Set<Peer> nodeUnion = new HashSet<>();
+ nodeUnion.addAll(allNodes);
+ nodeUnion.addAll(newNodes);
+ return nodeUnion;
+ }
+
+
+ void createQueue(Peer node) {
+ BlockingQueue<VotingEntry> logBlockingQueue;
+ logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
+ nodesLogQueuesMap.put(node, logBlockingQueue);
+ nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
for (int i = 0; i < bindingThreadNum; i++) {
- for (Entry<Peer, BlockingQueue<VotingLog>> pair : nodesLogQueuesMap.entrySet()) {
- executorServices
- .computeIfAbsent(
- pair.getKey(),
- n ->
- IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-"
- + member.getName()
- + "-"
- + pair.getKey().getEndpoint().getIp()
- + "-"
- + pair.getKey().getEndpoint().getPort()))
- .submit(newDispatcherThread(pair.getKey(), pair.getValue()));
+ executorServices
+ .computeIfAbsent(
+ node,
+ n -> createPool(node))
+ .submit(newDispatcherThread(node, logBlockingQueue));
+ }
+ }
+
+ ExecutorService createPool(Peer node) {
+ return IoTDBThreadPoolFactory.newCachedThreadPool(
+ "LogDispatcher-"
+ + member.getName()
+ + "-"
+ + node.getEndpoint().getIp()
+ + "-"
+ + node.getEndpoint().getPort()
+ + "-"
+ + node.getNodeId());
+ }
+
+ void createQueueAndBindingThreads(Collection<Peer> peers) {
+ for (Peer node : peers) {
+ if (!node.equals(member.getThisNode())) {
+ createQueue(node);
}
}
+ updateRateLimiter();
}
@TestOnly
@@ -134,18 +159,18 @@ public class LogDispatcher {
resultHandlerThread.shutdownNow();
}
- protected boolean addToQueue(BlockingQueue<VotingLog> nodeLogQueue, VotingLog request) {
+ protected boolean addToQueue(BlockingQueue<VotingEntry> nodeLogQueue, VotingEntry request) {
return nodeLogQueue.add(request);
}
- public void offer(VotingLog request) {
+ public void offer(VotingEntry request) {
- for (Entry<Peer, BlockingQueue<VotingLog>> entry : nodesLogQueuesMap.entrySet()) {
+ for (Entry<Peer, BlockingQueue<VotingEntry>> entry : nodesLogQueuesMap.entrySet()) {
if (nodesEnabled != null && !this.nodesEnabled.getOrDefault(entry.getKey(), false)) {
continue;
}
- BlockingQueue<VotingLog> nodeLogQueue = entry.getValue();
+ BlockingQueue<VotingEntry> nodeLogQueue = entry.getValue();
try {
boolean addSucceeded = addToQueue(nodeLogQueue, request);
@@ -164,18 +189,18 @@ public class LogDispatcher {
}
}
- DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingLog> logBlockingQueue) {
+ DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry> logBlockingQueue) {
return new DispatcherThread(node, logBlockingQueue);
}
protected class DispatcherThread implements Runnable {
Peer receiver;
- private final BlockingQueue<VotingLog> logBlockingDeque;
- protected List<VotingLog> currBatch = new ArrayList<>();
+ private final BlockingQueue<VotingEntry> logBlockingDeque;
+ protected List<VotingEntry> currBatch = new ArrayList<>();
private final String baseName;
- protected DispatcherThread(Peer receiver, BlockingQueue<VotingLog> logBlockingDeque) {
+ protected DispatcherThread(Peer receiver, BlockingQueue<VotingEntry> logBlockingDeque) {
this.receiver = receiver;
this.logBlockingDeque = logBlockingDeque;
baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
@@ -189,7 +214,7 @@ public class LogDispatcher {
try {
while (!Thread.interrupted()) {
synchronized (logBlockingDeque) {
- VotingLog poll = logBlockingDeque.take();
+ VotingEntry poll = logBlockingDeque.take();
currBatch.add(poll);
if (maxBatchSize > 1) {
while (!logBlockingDeque.isEmpty() && currBatch.size() < maxBatchSize) {
@@ -216,7 +241,7 @@ public class LogDispatcher {
}
protected void serializeEntries() throws InterruptedException {
- for (VotingLog request : currBatch) {
+ for (VotingEntry request : currBatch) {
request.getAppendEntryRequest().entry = request.getEntry().serialize();
request.getEntry().setByteSize(request.getAppendEntryRequest().entry.limit());
@@ -224,7 +249,7 @@ public class LogDispatcher {
}
private void appendEntriesAsync(
- List<ByteBuffer> logList, AppendEntriesRequest request, List<VotingLog> currBatch) {
+ List<ByteBuffer> logList, AppendEntriesRequest request, List<VotingEntry> currBatch) {
AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
AsyncRaftServiceClient client = member.getClient(receiver.getEndpoint());
try {
@@ -240,7 +265,7 @@ public class LogDispatcher {
}
protected AppendEntriesRequest prepareRequest(
- List<ByteBuffer> logList, List<VotingLog> currBatch, int firstIndex) {
+ List<ByteBuffer> logList, List<VotingEntry> currBatch, int firstIndex) {
AppendEntriesRequest request = new AppendEntriesRequest();
request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
@@ -261,7 +286,7 @@ public class LogDispatcher {
return request;
}
- private void sendLogs(List<VotingLog> currBatch) throws TException {
+ private void sendLogs(List<VotingEntry> currBatch) throws TException {
int logIndex = 0;
logger.debug(
"send logs from index {} to {}",
@@ -293,7 +318,7 @@ public class LogDispatcher {
}
public AppendNodeEntryHandler getAppendNodeEntryHandler(
- VotingLog log, Peer node, int quorumSize) {
+ VotingEntry log, Peer node, int quorumSize) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setDirectReceiver(node);
handler.setLog(log);
@@ -306,9 +331,9 @@ public class LogDispatcher {
private final List<AsyncMethodCallback<AppendEntryResult>> singleEntryHandlers;
- private AppendEntriesHandler(List<VotingLog> batch) {
+ private AppendEntriesHandler(List<VotingEntry> batch) {
singleEntryHandlers = new ArrayList<>(batch.size());
- for (VotingLog sendLogRequest : batch) {
+ for (VotingEntry sendLogRequest : batch) {
AppendNodeEntryHandler handler =
getAppendNodeEntryHandler(sendLogRequest, receiver, sendLogRequest.getQuorumSize());
singleEntryHandlers.add(handler);
@@ -335,7 +360,16 @@ public class LogDispatcher {
return nodesRate;
}
- public Map<Peer, BlockingQueue<VotingLog>> getNodesLogQueuesMap() {
+ public Map<Peer, BlockingQueue<VotingEntry>> getNodesLogQueuesMap() {
return nodesLogQueuesMap;
}
+
+ public void setNewNodes(List<Peer> newNodes) {
+ this.newNodes = newNodes;
+ for (Peer newNode : newNodes) {
+ if (!allNodes.contains(newNode)) {
+ createQueue(newNode);
+ }
+ }
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index 40533616d5..7a097e1f4d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -19,39 +19,34 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.exception.LogExecutionException;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
public class VotingLogList {
private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
- private int quorumSize;
private RaftMember member;
private Map<Peer, Long> stronglyAcceptedIndices = new ConcurrentHashMap<>();
private AtomicLong newCommitIndex = new AtomicLong(-1);
+ private boolean enableWeakAcceptance = false;
- public VotingLogList(int quorumSize, RaftMember member) {
- this.quorumSize = quorumSize;
+ public VotingLogList(RaftMember member) {
this.member = member;
+ stronglyAcceptedIndices.put(member.getThisNode(), Long.MAX_VALUE);
}
- private boolean tryCommit() {
+ private boolean tryCommit(VotingEntry entry) {
RaftLogManager logManager = member.getLogManager();
- if (computeNewCommitIndex()
+ if (computeNewCommitIndex(entry)
&& logManager != null
&& newCommitIndex.get() > logManager.getCommitLogIndex()) {
try {
@@ -65,15 +60,16 @@ public class VotingLogList {
}
}
- public boolean computeNewCommitIndex() {
- List<Entry<Peer, Long>> nodeIndices = new ArrayList<>(stronglyAcceptedIndices.entrySet());
- if (nodeIndices.size() < quorumSize) {
+ public boolean computeNewCommitIndex(VotingEntry entry) {
+ long currLogIndex = entry.getEntry().getCurrLogIndex();
+ if (newCommitIndex.get() >= currLogIndex) {
+ return false;
+ }
+ if (entry.isStronglyAccepted(stronglyAcceptedIndices)) {
+ return currLogIndex > newCommitIndex.getAndUpdate(ov -> Math.max(ov, currLogIndex));
+ } else {
return false;
}
- nodeIndices.sort(Entry.comparingByValue());
- Long value = nodeIndices.get(nodeIndices.size() - quorumSize).getValue();
- long oldValue = newCommitIndex.getAndUpdate(oldV -> Math.max(value, oldV));
- return value > oldValue;
}
/**
@@ -81,41 +77,28 @@ public class VotingLogList {
* all entries whose index <= the accepted entry. If any entry is accepted by a quorum, remove it
* from the list.
*
- * @param index
- * @param term
- * @param acceptingNode
* @return the lastly removed entry if any.
*/
- public void onStronglyAccept(long index, long term, Peer acceptingNode) {
- logger.debug("{}-{} is strongly accepted by {}", index, term, acceptingNode);
+ public void onStronglyAccept(VotingEntry entry, Peer acceptingNode) {
+ logger.debug("{} is strongly accepted by {}", entry, acceptingNode);
+ long currLogIndex = entry.getEntry().getCurrLogIndex();
Long newIndex =
stronglyAcceptedIndices.compute(
acceptingNode,
(nid, oldIndex) -> {
if (oldIndex == null) {
- return index;
+ return currLogIndex;
} else {
- if (index > oldIndex) {
- return index;
+ if (currLogIndex > oldIndex) {
+ return currLogIndex;
}
return oldIndex;
}
});
- if (newIndex == index) {
- tryCommit();
- }
- }
-
- public int totalAcceptedNodeNum(VotingLog log) {
- long index = log.getEntry().getCurrLogIndex();
- int num = log.getWeaklyAcceptedNodes().size();
- for (Entry<Peer, Long> entry : stronglyAcceptedIndices.entrySet()) {
- if (entry.getValue() >= index) {
- num++;
- }
+ if (newIndex == currLogIndex) {
+ tryCommit(entry);
}
- return num;
}
public String report() {
@@ -123,4 +106,43 @@ public class VotingLogList {
"Nodes accepted indices: %s, new commitIndex: %d",
stronglyAcceptedIndices, newCommitIndex.get());
}
+
+ public AcceptedType computeAcceptedType(VotingEntry votingEntry) {
+ if ((votingEntry.getEntry().getCurrLogIndex() == Long.MIN_VALUE)) {
+ return AcceptedType.NOT_ACCEPTED;
+ }
+
+ if (newCommitIndex.get() >= votingEntry.getEntry().getCurrLogIndex()) {
+ return AcceptedType.STRONGLY_ACCEPTED;
+ }
+
+ if (enableWeakAcceptance) {
+ int currNodeQuorumNum = votingEntry.currNodesQuorumNum();
+ int newNodeQuorumNum = votingEntry.newNodesQuorumNum();
+ int stronglyAcceptedNumByCurrNodes = votingEntry.stronglyAcceptedNumByCurrNodes(
+ stronglyAcceptedIndices);
+ int stronglyAcceptedNumByNewNodes = votingEntry.stronglyAcceptedNumByNewNodes(
+ stronglyAcceptedIndices);
+ int weaklyAcceptedNumByCurrNodes = votingEntry.weaklyAcceptedNumByCurrNodes(
+ stronglyAcceptedIndices);
+ int weaklyAcceptedNumByNewNodes = votingEntry.weaklyAcceptedNumByNewNodes(
+ stronglyAcceptedIndices);
+ if ((weaklyAcceptedNumByCurrNodes + stronglyAcceptedNumByCurrNodes) >= currNodeQuorumNum &&
+ (weaklyAcceptedNumByNewNodes + stronglyAcceptedNumByNewNodes) >= newNodeQuorumNum) {
+ return AcceptedType.WEAKLY_ACCEPTED;
+ }
+ }
+
+ return AcceptedType.NOT_ACCEPTED;
+ }
+
+ public void setEnableWeakAcceptance(boolean enableWeakAcceptance) {
+ this.enableWeakAcceptance = enableWeakAcceptance;
+ }
+
+ public enum AcceptedType {
+ NOT_ACCEPTED,
+ STRONGLY_ACCEPTED,
+ WEAKLY_ACCEPTED
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 14a1b20eb8..3c838f81b7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.LogDispatcher;
import org.slf4j.Logger;
@@ -88,7 +88,7 @@ public class FlowBalancer {
double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
double assumedFlow = thisNodeFlow * overestimateFactor;
logger.info("Flow of this node: {}", thisNodeFlow);
- Map<Peer, BlockingQueue<VotingLog>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap();
+ Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap();
Map<Peer, Double> nodesRate = logDispatcher.getNodesRate();
// sort followers according to their queue length
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
new file mode 100644
index 0000000000..8f63f3142f
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.protocol.log.logtype;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+
+public class ConfigChangeEntry extends Entry {
+
+ private List<Peer> oldPeers;
+ private List<Peer> newPeers;
+
+ public ConfigChangeEntry(List<Peer> oldPeers, List<Peer> newPeers) {
+ this.oldPeers = oldPeers;
+ this.newPeers = newPeers;
+ }
+
+ @Override
+ public ByteBuffer serialize() {
+ ByteArrayOutputStream byteArrayOutputStream =
+ new ByteArrayOutputStream(getDefaultSerializationBufferSize());
+ try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
+ dataOutputStream.writeLong(getCurrLogIndex());
+ dataOutputStream.writeLong(getCurrLogTerm());
+
+ dataOutputStream.writeInt(oldPeers.size());
+ for (Peer oldPeer : oldPeers) {
+ oldPeer.serialize(dataOutputStream);
+ }
+ dataOutputStream.writeInt(newPeers.size());
+ for (Peer newPeer : newPeers) {
+ newPeer.serialize(dataOutputStream);
+ }
+ } catch (IOException e) {
+ // unreachable
+ }
+ return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ setCurrLogIndex(buffer.getLong());
+ setCurrLogTerm(buffer.getLong());
+
+ int size = buffer.getInt();
+ oldPeers = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ oldPeers.add(Peer.deserialize(buffer));
+ }
+ size = buffer.getInt();
+ newPeers = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ newPeers.add(Peer.deserialize(buffer));
+ }
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencer.java
index 328b74099a..1eb79d1789 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencer.java
@@ -20,8 +20,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log.sequencing;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
-import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
/**
* LogSequencer assigns a unique index and associated term to a log entry and offers the entry to a
@@ -36,9 +35,7 @@ public interface LogSequencer {
* @param e a log entry that is not yet indexed.
* @return A SendLogRequest through which the caller can monitor the status of the sending entry.
*/
- VotingLog sequence(Entry e);
-
- void setLogManager(RaftLogManager logManager);
+ VotingEntry sequence(Entry e);
void close();
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
index 62c73f6955..ed65209947 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
@@ -25,5 +25,5 @@ import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
public interface LogSequencerFactory {
- LogSequencer create(RaftMember member, RaftLogManager logManager, RaftConfig config);
+ LogSequencer create(RaftMember member, RaftConfig config);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
index ab85f0e725..aaba4ae2c6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
@@ -22,17 +22,15 @@ package org.apache.iotdb.consensus.natraft.protocol.log.sequencing;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+import org.apache.iotdb.consensus.natraft.utils.LogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
/**
* SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller
@@ -42,35 +40,24 @@ public class SynchronousSequencer implements LogSequencer {
private static final Logger logger = LoggerFactory.getLogger(SynchronousSequencer.class);
private RaftMember member;
- private RaftLogManager logManager;
private RaftConfig config;
- public SynchronousSequencer(RaftMember member, RaftLogManager logManager, RaftConfig config) {
+ public SynchronousSequencer(RaftMember member, RaftConfig config) {
this.member = member;
- this.logManager = logManager;
this.config = config;
}
- private VotingLog enqueueEntry(VotingLog sendLogRequest) {
- if (member.getAllNodes().size() > 1) {
- member.getLogDispatcher().offer(sendLogRequest);
- }
- return sendLogRequest;
- }
-
- private static AtomicLong indexBlockCounter = new AtomicLong();
@Override
- public VotingLog sequence(Entry e) {
- VotingLog sendLogRequest = null;
+ public VotingEntry sequence(Entry e) {
+ VotingEntry votingEntry = null;
long startWaitingTime = System.currentTimeMillis();
-
+ RaftLogManager logManager = member.getLogManager();
while (true) {
try {
logManager.getLock().writeLock().lock();
- indexBlockCounter.decrementAndGet();
Entry lastEntry = logManager.getLastEntry();
long lastIndex = lastEntry.getCurrLogIndex();
long lastTerm = lastEntry.getCurrLogTerm();
@@ -85,10 +72,10 @@ public class SynchronousSequencer implements LogSequencer {
// logDispatcher will serialize log, and set log size, and we will use the size after it
logManager.append(Collections.singletonList(e));
- sendLogRequest = buildSendLogRequest(e);
+ votingEntry = LogUtils.buildVotingLog(e, member);
if (!(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance())) {
- sendLogRequest = enqueueEntry(sendLogRequest);
+ votingEntry = LogUtils.enqueueEntry(votingEntry, member);
}
break;
}
@@ -108,70 +95,18 @@ public class SynchronousSequencer implements LogSequencer {
}
if (config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance()) {
- sendLogRequest = enqueueEntry(sendLogRequest);
+ votingEntry = LogUtils.enqueueEntry(votingEntry, member);
}
- return sendLogRequest;
- }
-
- @Override
- public void setLogManager(RaftLogManager logManager) {
- this.logManager = logManager;
+ return votingEntry;
}
- private VotingLog buildSendLogRequest(Entry e) {
- VotingLog votingLog = member.buildVotingLog(e);
-
- AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(e, false);
- votingLog.setAppendEntryRequest(appendEntryRequest);
-
- return votingLog;
- }
-
- public AppendEntryRequest buildAppendEntryRequest(Entry e, boolean serializeNow) {
- AppendEntryRequest request = buildAppendEntryRequestBasic(e, serializeNow);
- request = buildAppendEntryRequestExtended(request, e, serializeNow);
- return request;
- }
-
- protected AppendEntryRequest buildAppendEntryRequestBasic(Entry entry, boolean serializeNow) {
- AppendEntryRequest request = new AppendEntryRequest();
- request.setTerm(member.getStatus().getTerm().get());
- if (serializeNow) {
- ByteBuffer byteBuffer = entry.serialize();
- entry.setByteSize(byteBuffer.array().length);
- request.entry = byteBuffer;
- }
- try {
- if (entry.getPrevTerm() != -1) {
- request.setPrevLogTerm(entry.getPrevTerm());
- } else {
- request.setPrevLogTerm(logManager.getTerm(entry.getCurrLogIndex() - 1));
- }
- } catch (Exception e) {
- logger.error("getTerm failed for newly append entries", e);
- }
- request.setLeader(member.getThisNode().getEndpoint());
- request.setLeaderId(member.getThisNode().getNodeId());
- // don't need lock because even if it's larger than the commitIndex when appending this log to
- // logManager, the follower can handle the larger commitIndex with no effect
- request.setLeaderCommit(logManager.getCommitLogIndex());
- request.setPrevLogIndex(entry.getCurrLogIndex() - 1);
- request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
-
- return request;
- }
-
- protected AppendEntryRequest buildAppendEntryRequestExtended(
- AppendEntryRequest request, Entry e, boolean serializeNow) {
- return request;
- }
public static class Factory implements LogSequencerFactory {
@Override
- public LogSequencer create(RaftMember member, RaftLogManager logManager, RaftConfig config) {
- return new SynchronousSequencer(member, logManager, config);
+ public LogSequencer create(RaftMember member, RaftConfig config) {
+ return new SynchronousSequencer(member, config);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
new file mode 100644
index 0000000000..d9f137277f
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.consensus.natraft.utils;
+
+import java.nio.ByteBuffer;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(LogUtils.class);
+
+ public static VotingEntry buildVotingLog(Entry e, RaftMember member) {
+ VotingEntry votingEntry = member.buildVotingLog(e);
+
+ AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(e, false, member);
+ votingEntry.setAppendEntryRequest(appendEntryRequest);
+
+ return votingEntry;
+ }
+
+ public static AppendEntryRequest buildAppendEntryRequest(Entry entry, boolean serializeNow,
+ RaftMember member) {
+ AppendEntryRequest request = new AppendEntryRequest();
+ request.setTerm(member.getStatus().getTerm().get());
+ if (serializeNow) {
+ ByteBuffer byteBuffer = entry.serialize();
+ entry.setByteSize(byteBuffer.array().length);
+ request.entry = byteBuffer;
+ }
+ try {
+ if (entry.getPrevTerm() != -1) {
+ request.setPrevLogTerm(entry.getPrevTerm());
+ } else {
+ request.setPrevLogTerm(member.getLogManager().getTerm(entry.getCurrLogIndex() - 1));
+ }
+ } catch (Exception e) {
+ logger.error("getTerm failed for newly append entries", e);
+ }
+ request.setLeader(member.getThisNode().getEndpoint());
+ request.setLeaderId(member.getThisNode().getNodeId());
+ // don't need lock because even if it's larger than the commitIndex when appending this log to
+ // logManager, the follower can handle the larger commitIndex with no effect
+ request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
+ request.setPrevLogIndex(entry.getCurrLogIndex() - 1);
+ request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
+
+ return request;
+ }
+
+ public static VotingEntry enqueueEntry(VotingEntry sendLogRequest, RaftMember member) {
+ if (member.getAllNodes().size() > 1) {
+ member.getLogDispatcher().offer(sendLogRequest);
+ }
+ return sendLogRequest;
+ }
+
+
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
new file mode 100644
index 0000000000..229a9bc81e
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
@@ -0,0 +1,18 @@
+package org.apache.iotdb.consensus.natraft.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.consensus.common.Peer;
+
+public class NodeUtils {
+
+ public static List<Peer> computeAddedNodes(List<Peer> oldNodes, List<Peer> newNodes) {
+ List<Peer> addedNode = new ArrayList<>();
+ for (Peer newNode : newNodes) {
+ if (!oldNodes.contains(newNode)) {
+ addedNode.add(newNode);
+ }
+ }
+ return addedNode;
+ }
+}