You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:06 UTC
[iotdb] 06/09: merge sequencer
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e6c29bcaac1a34971a3922b3c9ef6e351ebacb3e
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 15:45:26 2021 +0800
merge sequencer
---
.../apache/iotdb/cluster/config/ClusterConfig.java | 3 +-
.../apache/iotdb/cluster/expr/SequencerExpr.java | 339 +++------------------
.../iotdb/cluster/log/manage/RaftLogManager.java | 4 +-
.../log/sequencing/AsynchronousSequencer.java | 33 +-
.../log/sequencing/LogSequencerFactory.java | 1 -
.../log/sequencing/SynchronousSequencer.java | 12 +-
.../cluster/server/member/DataGroupMember.java | 1 -
.../cluster/server/member/MetaGroupMember.java | 1 -
.../iotdb/cluster/server/member/RaftMember.java | 137 +++------
9 files changed, 123 insertions(+), 408 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index c9cbc59..420f7ca 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -183,8 +183,7 @@ public class ClusterConfig {
private boolean useIndirectBroadcasting = false;
- private boolean useAsyncSequencing = false;
-
+ private boolean useAsyncSequencing = true;
/**
* create a clusterConfig class. The internalIP will be set according to the server's hostname. If
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java
index 7dae6a7..4629962 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java
@@ -19,23 +19,9 @@
package org.apache.iotdb.cluster.expr;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.exception.LogExecutionException;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
-import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
-import org.apache.iotdb.cluster.log.LogParser;
-import org.apache.iotdb.cluster.log.VotingLog;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -47,265 +33,34 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Timer;
-import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
-import org.apache.iotdb.cluster.utils.IOUtils;
-import org.apache.iotdb.cluster.utils.StatusUtils;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
-import org.apache.iotdb.db.qp.physical.sys.LogPlan;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class SequencerExpr extends MetaGroupMember {
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
- private static final Logger logger = LoggerFactory.getLogger(SequencerExpr.class);
+public class SequencerExpr extends MetaGroupMember {
- private int v2ThreadNum = 2000;
- private int v3ThreadNum = 0000;
+ private int threadNum = 1000;
private AtomicLong reqCnt = new AtomicLong();
- private BlockingQueue<SendLogRequest> nonsequencedLogQueue = new ArrayBlockingQueue<>(
- 4096);
-
public SequencerExpr() {
- LogApplier applier = new LogApplier() {
- @Override
- public void apply(Log log) {
- log.setApplied(true);
- }
-
- @Override
- public void close() {
-
- }
- };
+ LogApplier applier =
+ new LogApplier() {
+ @Override
+ public void apply(Log log) {
+ log.setApplied(true);
+ }
+
+ @Override
+ public void close() {}
+ };
logManager = new MetaSingleSnapshotLogManager(applier, this);
- new Thread(this::sequenceLog).start();
- new Thread(this::sequenceLog).start();
- new Thread(this::sequenceLog).start();
- new Thread(this::sequenceLog).start();
reportThread = Executors.newSingleThreadScheduledExecutor();
reportThread.scheduleAtFixedRate(
this::generateNodeReport, REPORT_INTERVAL_SEC, REPORT_INTERVAL_SEC, TimeUnit.SECONDS);
- }
-
- private TSStatus processPlanLocallyV2(PhysicalPlan plan) {
- logger.debug("{}: Processing plan {}", getName(), plan);
- // assign term and index to the new log and append it
- SendLogRequest sendLogRequest;
-
- Log log;
- if (plan instanceof LogPlan) {
- try {
- log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
- } catch (UnknownLogTypeException e) {
- logger.error("Can not parse LogPlan {}", plan, e);
- return StatusUtils.PARSE_LOG_ERROR;
- }
- } else {
- log = new PhysicalPlanLog();
- ((PhysicalPlanLog) log).setPlan(plan);
- }
-
- if (log.serialize().capacity() + Integer.BYTES
- >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
- logger.error(
- "Log cannot fit into buffer, please increase raft_log_buffer_size;"
- + "or reduce the size of requests you send.");
- return StatusUtils.INTERNAL_ERROR;
- }
-
- long startTime =
- Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
- synchronized (logManager) {
- Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
- startTime);
-
- plan.setIndex(logManager.getLastLogIndex() + 1);
- log.setCurrLogTerm(getTerm().get());
- log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-
- startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
- // just like processPlanLocally,we need to check the size of log
-
- // logDispatcher will serialize log, and set log size, and we will use the size after it
- logManager.append(log);
- Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
-
- startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
- sendLogRequest = buildSendLogRequest(log);
- Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
-
- startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
- log.setCreateTime(System.nanoTime());
- votingLogList.insert(sendLogRequest.getVotingLog());
- getLogDispatcher().offer(sendLogRequest);
- Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
- }
-
- try {
- AppendLogResult appendLogResult =
- waitAppendResult(
- sendLogRequest.getVotingLog(),
- sendLogRequest.getLeaderShipStale(),
- sendLogRequest.getNewLeaderTerm(),
- sendLogRequest.getQuorumSize());
- Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
- sendLogRequest.getVotingLog().getLog().getCreateTime());
-
- switch (appendLogResult) {
- case WEAK_ACCEPT:
- // TODO: change to weak
- Statistic.RAFT_WEAK_ACCEPT.add(1);
- return StatusUtils.OK;
- case OK:
- logger.debug(MSG_LOG_IS_ACCEPTED, getName(), log);
- startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
- commitLog(log);
- Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
- return StatusUtils.OK;
- case TIME_OUT:
- logger.debug("{}: log {} timed out...", getName(), log);
- break;
- case LEADERSHIP_STALE:
- // abort the appending, the new leader will fix the local logs by catch-up
- default:
- break;
- }
- } catch (Exception e) {
- return handleLogExecutionException(log, IOUtils.getRootCause(e));
- }
- return StatusUtils.TIME_OUT;
- }
-
- public SendLogRequest enqueueSendLogRequest(Log log) {
- VotingLog votingLog = buildVotingLog(log);
- AtomicBoolean leaderShipStale = new AtomicBoolean(false);
- AtomicLong newLeaderTerm = new AtomicLong(term.get());
-
- SendLogRequest sendLogRequest = new SendLogRequest(
- votingLog, leaderShipStale, newLeaderTerm, null, allNodes.size() / 2);
- try {
- nonsequencedLogQueue.put(sendLogRequest);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return sendLogRequest;
- }
-
- private void sequenceLog(List<SendLogRequest> sendLogRequests) {
- long startTime;
- synchronized (logManager) {
- for (SendLogRequest sendLogRequest : sendLogRequests) {
- Log log = sendLogRequest.getVotingLog().getLog();
- log.setCurrLogTerm(getTerm().get());
- log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
- if (log instanceof PhysicalPlanLog) {
- ((PhysicalPlanLog) log).getPlan().setIndex(log.getCurrLogIndex());
- }
-
- startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
- // just like processPlanLocally,we need to check the size of log
-
- // logDispatcher will serialize log, and set log size, and we will use the size after it
- logManager.append(log);
- Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
-
- AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(log, false);
- sendLogRequest.setAppendEntryRequest(appendEntryRequest);
-
- startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
- log.setCreateTime(System.nanoTime());
- votingLogList.insert(sendLogRequest.getVotingLog());
- getLogDispatcher().offer(sendLogRequest);
- Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
- }
- }
- sendLogRequests.clear();
- }
-
- private void sequenceLog() {
- List<SendLogRequest> sendLogRequests = new ArrayList<>();
- while (!Thread.interrupted()) {
- try {
- synchronized (nonsequencedLogQueue) {
- SendLogRequest request = nonsequencedLogQueue.take();
- sendLogRequests.add(request);
- nonsequencedLogQueue.drainTo(sendLogRequests);
- }
-
- sequenceLog(sendLogRequests);
- } catch (InterruptedException e) {
- return;
- }
- }
- }
-
- private TSStatus processPlanLocallyV3(PhysicalPlan plan) {
- logger.debug("{}: Processing plan {}", getName(), plan);
- // assign term and index to the new log and append it
- SendLogRequest sendLogRequest;
-
- Log log;
- if (plan instanceof LogPlan) {
- try {
- log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
- } catch (UnknownLogTypeException e) {
- logger.error("Can not parse LogPlan {}", plan, e);
- return StatusUtils.PARSE_LOG_ERROR;
- }
- } else {
- log = new PhysicalPlanLog();
- ((PhysicalPlanLog) log).setPlan(plan);
- }
-
- if (log.serialize().capacity() + Integer.BYTES
- >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
- logger.error(
- "Log cannot fit into buffer, please increase raft_log_buffer_size;"
- + "or reduce the size of requests you send.");
- return StatusUtils.INTERNAL_ERROR;
- }
-
- long startTime;
- sendLogRequest = enqueueSendLogRequest(log);
-
- try {
- AppendLogResult appendLogResult =
- waitAppendResult(
- sendLogRequest.getVotingLog(),
- sendLogRequest.getLeaderShipStale(),
- sendLogRequest.getNewLeaderTerm(),
- sendLogRequest.getQuorumSize());
- Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
- sendLogRequest.getVotingLog().getLog().getCreateTime());
-
- switch (appendLogResult) {
- case WEAK_ACCEPT:
- // TODO: change to weak
- Statistic.RAFT_WEAK_ACCEPT.add(1);
- return StatusUtils.OK;
- case OK:
- logger.debug(MSG_LOG_IS_ACCEPTED, getName(), log);
- startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
- commitLog(log);
- Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
- return StatusUtils.OK;
- case TIME_OUT:
- logger.debug("{}: log {} timed out...", getName(), log);
- break;
- case LEADERSHIP_STALE:
- // abort the appending, the new leader will fix the local logs by catch-up
- default:
- break;
- }
- } catch (Exception e) {
- return handleLogExecutionException(log, IOUtils.getRootCause(e));
- }
- return StatusUtils.TIME_OUT;
+ logSequencer = SEQUENCER_FACTORY.create(this, logManager);
}
@Override
@@ -323,43 +78,36 @@ public class SequencerExpr extends MetaGroupMember {
};
}
- private void decentralizedSequencing() {
- for (int i = 0; i < v2ThreadNum; i++) {
- new Thread(() -> {
- while (true) {
- reqCnt.incrementAndGet();
- DummyPlan dummyPlan = new DummyPlan();
- processPlanLocallyV2(dummyPlan);
- }
- }).start();
- }
- }
-
- private void centralizedSequencing() {
- for (int i = 0; i < v3ThreadNum; i++) {
- new Thread(() -> {
- while (true) {
- reqCnt.incrementAndGet();
- DummyPlan dummyPlan = new DummyPlan();
- processPlanLocallyV3(dummyPlan);
- }
- }).start();
+ private void sequencing() {
+ for (int i = 0; i < threadNum; i++) {
+ new Thread(
+ () -> {
+ while (true) {
+ reqCnt.incrementAndGet();
+ DummyPlan dummyPlan = new DummyPlan();
+ processPlanLocallyV2(dummyPlan);
+ }
+ })
+ .start();
}
}
private void startMonitor() {
- new Thread(() -> {
- long startTime = System.currentTimeMillis();
- while (true) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- long consumedTime = System.currentTimeMillis() - startTime;
- System.out.println("" + consumedTime + ", " + (reqCnt.get() * 1.0 / consumedTime * 1000L));
- }
- }).start();
+ new Thread(
+ () -> {
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ long consumedTime = System.currentTimeMillis() - startTime;
+ System.out.println(
+ "" + consumedTime + ", " + (reqCnt.get() * 1.0 / consumedTime * 1000L));
+ }
+ })
+ .start();
}
public static void main(String[] args) {
@@ -372,8 +120,7 @@ public class SequencerExpr extends MetaGroupMember {
group.add(new Node().setNodeIdentifier(i).setMetaPort(i));
}
sequencerExpr.setAllNodes(group);
- sequencerExpr.centralizedSequencing();
- sequencerExpr.decentralizedSequencing();
+ sequencerExpr.sequencing();
sequencerExpr.startMonitor();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index ac60e44..4b445df 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -976,6 +976,8 @@ public abstract class RaftLogManager {
while (!Thread.currentThread().isInterrupted()) {
try {
doCheckAppliedLogIndex();
+ } catch (IndexOutOfBoundsException e) {
+ // ignore
} catch (Exception e) {
logger.error("{}, an exception occurred when checking the applied log index", name, e);
}
@@ -998,7 +1000,7 @@ public abstract class RaftLogManager {
}
Log log = getCommittedEntryManager().getEntry(nextToCheckIndex);
if (log == null || log.getCurrLogIndex() != nextToCheckIndex) {
- logger.warn(
+ logger.debug(
"{}, get log error when checking the applied log index, log={}, nextToCheckIndex={}",
name,
log,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index 63cc38b..0def4d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -19,16 +19,9 @@
package org.apache.iotdb.cluster.log.sequencing;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -36,9 +29,18 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
public class AsynchronousSequencer implements LogSequencer {
private static final Logger logger = LoggerFactory.getLogger(AsynchronousSequencer.class);
@@ -51,8 +53,7 @@ public class AsynchronousSequencer implements LogSequencer {
private BlockingQueue<SendLogRequest> unsequencedLogQueue;
- public AsynchronousSequencer(RaftMember member,
- RaftLogManager logManager) {
+ public AsynchronousSequencer(RaftMember member, RaftLogManager logManager) {
this.member = member;
this.logManager = logManager;
unsequencedLogQueue = new ArrayBlockingQueue<>(4096);
@@ -62,12 +63,13 @@ public class AsynchronousSequencer implements LogSequencer {
}
public SendLogRequest enqueueSendLogRequest(Log log) {
- AtomicInteger voteCounter = new AtomicInteger(member.getAllNodes().size() / 2);
+ VotingLog votingLog = member.buildVotingLog(log);
AtomicBoolean leaderShipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
- SendLogRequest request = new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm,
- null);
+ SendLogRequest request =
+ new SendLogRequest(
+ votingLog, leaderShipStale, newLeaderTerm, null, member.getAllNodes().size() / 2);
try {
unsequencedLogQueue.put(request);
} catch (InterruptedException e) {
@@ -81,7 +83,7 @@ public class AsynchronousSequencer implements LogSequencer {
long startTime;
synchronized (logManager) {
for (SendLogRequest sendLogRequest : sendLogRequests) {
- Log log = sendLogRequest.getLog();
+ Log log = sendLogRequest.getVotingLog().getLog();
log.setCurrLogTerm(member.getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
if (log instanceof PhysicalPlanLog) {
@@ -100,6 +102,7 @@ public class AsynchronousSequencer implements LogSequencer {
startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
log.setCreateTime(System.nanoTime());
+ member.getVotingLogList().insert(sendLogRequest.getVotingLog());
member.getLogDispatcher().offer(sendLogRequest);
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
}
@@ -127,7 +130,7 @@ public class AsynchronousSequencer implements LogSequencer {
@Override
public SendLogRequest sequence(Log log) {
- return null;
+ return enqueueSendLogRequest(log);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java
index 627ef84..cf5b3cc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java
@@ -17,7 +17,6 @@
* under the License.
*/
-
package org.apache.iotdb.cluster.log.sequencing;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index 4da61b5..f7e6704 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.log.sequencing;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -28,7 +29,6 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -70,6 +70,7 @@ public class SynchronousSequencer implements LogSequencer {
startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
log.setCreateTime(System.nanoTime());
+ member.getVotingLogList().insert(sendLogRequest.getVotingLog());
member.getLogDispatcher().offer(sendLogRequest);
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
}
@@ -82,7 +83,7 @@ public class SynchronousSequencer implements LogSequencer {
}
private SendLogRequest buildSendLogRequest(Log log) {
- AtomicInteger voteCounter = new AtomicInteger(member.getAllNodes().size() / 2);
+ VotingLog votingLog = member.buildVotingLog(log);
AtomicBoolean leaderShipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
@@ -90,7 +91,12 @@ public class SynchronousSequencer implements LogSequencer {
AppendEntryRequest appendEntryRequest = member.buildAppendEntryRequest(log, false);
Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
- return new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm, appendEntryRequest);
+ return new SendLogRequest(
+ votingLog,
+ leaderShipStale,
+ newLeaderTerm,
+ appendEntryRequest,
+ member.getAllNodes().size() / 2);
}
public static class Factory implements LogSequencerFactory {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index a00420f..eeea9d6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -43,7 +43,6 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.manage.FilePartitionedSnapshotLogManager;
import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager;
-import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 81abb87..9dafebe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -43,7 +43,6 @@ import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
-import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
import org.apache.iotdb.cluster.partition.NodeAdditionResult;
import org.apache.iotdb.cluster.partition.NodeRemovalResult;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 2835cf3..829e76b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
-import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer;
import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;
import org.apache.iotdb.cluster.log.sequencing.LogSequencer;
import org.apache.iotdb.cluster.log.sequencing.LogSequencerFactory;
@@ -124,8 +123,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
/**
- * RaftMember process the common raft logic like leader election, log appending, catch-up and so
- * on.
+ * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
*/
@SuppressWarnings("java:S3077") // reference volatile is enough
public abstract class RaftMember {
@@ -137,7 +135,8 @@ public abstract class RaftMember {
public static boolean ENABLE_COMMIT_RETURN = false;
protected static final LogSequencerFactory SEQUENCER_FACTORY =
- ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing() ? new Factory()
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing()
+ ? new Factory()
: new SynchronousSequencer.Factory();
private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
@@ -158,32 +157,22 @@ public abstract class RaftMember {
* on this may be woken.
*/
private final Object waitLeaderCondition = new Object();
- /**
- * the lock is to make sure that only one thread can apply snapshot at the same time
- */
+ /** the lock is to make sure that only one thread can apply snapshot at the same time */
private final Object snapshotApplyLock = new Object();
private final Object heartBeatWaitObject = new Object();
protected Node thisNode = ClusterConstant.EMPTY_NODE;
- /**
- * the nodes that belong to the same raft group as thisNode.
- */
+ /** the nodes that belong to the same raft group as thisNode. */
protected PartitionGroup allNodes;
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- /**
- * the name of the member, to distinguish several members in the logs.
- */
+ /** the name of the member, to distinguish several members in the logs. */
String name;
- /**
- * to choose nodes to send request of joining cluster randomly.
- */
+ /** to choose nodes to send request of joining cluster randomly. */
Random random = new Random();
- /**
- * when the node is a leader, this map is used to track log progress of each follower.
- */
+ /** when the node is a leader, this map is used to track log progress of each follower. */
Map<Node, Peer> peerMap;
/**
* the current term of the node, this object also works as lock of some transactions of the member
@@ -208,8 +197,8 @@ public abstract class RaftMember {
/** the raft logs are all stored and maintained in the log manager */
protected RaftLogManager logManager;
- /**s
- * the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
+ /**
+ * s the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
* when this node is a leader, or start elections when this node is an elector.
*/
ExecutorService heartBeatService;
@@ -224,9 +213,7 @@ public abstract class RaftMember {
* member by comparing it with the current last log index.
*/
long lastReportedLogIndex;
- /**
- * the thread pool that runs catch-up tasks
- */
+ /** the thread pool that runs catch-up tasks */
private ExecutorService catchUpService;
/**
* lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
@@ -262,26 +249,20 @@ public abstract class RaftMember {
* one slow node.
*/
private ExecutorService serialToParallelPool;
- /**
- * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
- */
+ /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
private ExecutorService commitLogPool;
/**
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
- * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
- * logs.
+ * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
*/
private LogDispatcher logDispatcher;
- /**
- * If this node can not be the leader, this parameter will be set true.
- */
+ /** If this node can not be the leader, this parameter will be set true. */
private volatile boolean skipElection = false;
/**
- * localExecutor is used to directly execute plans like load configuration in the underlying
- * IoTDB
+ * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
*/
protected PlanExecutor localExecutor;
@@ -842,22 +823,16 @@ public abstract class RaftMember {
return lastCatchUpResponseTime;
}
- /**
- * Sub-classes will add their own process of HeartBeatResponse in this method.
- */
- public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
- }
+ /** Sub-classes will add their own process of HeartBeatResponse in this method. */
+ public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {}
- /**
- * The actions performed when the node wins in an election (becoming a leader).
- */
- public void onElectionWins() {
- }
+ /** The actions performed when the node wins in an election (becoming a leader). */
+ public void onElectionWins() {}
/**
* Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
- * follower. If some of the required logs are removed, also send the snapshot. <br> notice that if
- * a part of data is in the snapshot, then it is not in the logs.
+ * follower. If some of the required logs are removed, also send the snapshot. <br>
+ * notice that if a part of data is in the snapshot, then it is not in the logs.
*/
public void catchUp(Node follower, long lastLogIdx) {
// for one follower, there is at most one ongoing catch-up, so the same data will not be sent
@@ -944,9 +919,7 @@ public abstract class RaftMember {
}
}
- /**
- * call back after syncLeader
- */
+ /** call back after syncLeader */
public interface CheckConsistency {
/**
@@ -955,7 +928,7 @@ public abstract class RaftMember {
* @param leaderCommitId leader commit id
* @param localAppliedId local applied id
* @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in
- * implements.
+ * implements.
*/
void postCheckConsistency(long leaderCommitId, long localAppliedId)
throws CheckConsistencyException;
@@ -964,7 +937,8 @@ public abstract class RaftMember {
public static class MidCheckConsistency implements CheckConsistency {
/**
- * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw CHECK_MID_CONSISTENCY_EXCEPTION
+ * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw
+ * CHECK_MID_CONSISTENCY_EXCEPTION
*
* @param leaderCommitId leader commit id
* @param localAppliedId local applied id
@@ -976,7 +950,7 @@ public abstract class RaftMember {
if (leaderCommitId == Long.MAX_VALUE
|| leaderCommitId == Long.MIN_VALUE
|| leaderCommitId - localAppliedId
- > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+ > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
}
}
@@ -1009,7 +983,7 @@ public abstract class RaftMember {
* @param checkConsistency check after syncleader
* @return true if the node has caught up, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
if (character == NodeCharacter.LEADER) {
@@ -1028,9 +1002,7 @@ public abstract class RaftMember {
return waitUntilCatchUp(checkConsistency);
}
- /**
- * Wait until the leader of this node becomes known or time out.
- */
+ /** Wait until the leader of this node becomes known or time out. */
public void waitLeader() {
long startTime = System.currentTimeMillis();
while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
@@ -1057,7 +1029,7 @@ public abstract class RaftMember {
*
* @return true if this node has caught up before timeout, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
throws CheckConsistencyException {
@@ -1090,7 +1062,7 @@ public abstract class RaftMember {
* sync local applyId to leader commitId
*
* @param leaderCommitId leader commit id
- * @param fastFail if enable, when log differ too much, return false directly.
+ * @param fastFail if enable, when log differ too much, return false directly.
* @return true if leaderCommitId <= localAppliedId
*/
public boolean syncLocalApply(long leaderCommitId, boolean fastFail) {
@@ -1144,7 +1116,7 @@ public abstract class RaftMember {
* call this method. Will commit the log locally and send it to followers
*
* @return OK if over half of the followers accept the log or null if the leadership is lost
- * during the appending
+ * during the appending
*/
public TSStatus processPlanLocally(PhysicalPlan plan) {
if (USE_LOG_DISPATCHER) {
@@ -1198,7 +1170,7 @@ public abstract class RaftMember {
return StatusUtils.TIME_OUT;
}
- private TSStatus processPlanLocallyV2(PhysicalPlan plan) {
+ protected TSStatus processPlanLocallyV2(PhysicalPlan plan) {
logger.debug("{}: Processing plan {}", name, plan);
if (readOnly) {
return StatusUtils.NODE_READ_ONLY;
@@ -1277,7 +1249,7 @@ public abstract class RaftMember {
votingLog, leaderShipStale, newLeaderTerm, appendEntryRequest, allNodes.size() / 2);
}
- protected VotingLog buildVotingLog(Log log) {
+ public VotingLog buildVotingLog(Log log) {
return new VotingLog(log, allNodes.size());
}
@@ -1369,9 +1341,7 @@ public abstract class RaftMember {
return peerMap;
}
- /**
- * @return true if there is a log whose index is "index" and term is "term", false otherwise
- */
+ /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
public boolean matchLog(long index, long term) {
boolean matched = logManager.matchTerm(term, index);
logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -1390,18 +1360,15 @@ public abstract class RaftMember {
return syncLock;
}
- /**
- * Sub-classes will add their own process of HeartBeatRequest in this method.
- */
- void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {
- }
+ /** Sub-classes will add their own process of HeartBeatRequest in this method. */
+ void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {}
/**
* Verify the validity of an ElectionRequest, and make itself a follower of the elector if the
* request is valid.
*
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
- * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+ * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
*/
long checkElectorLogProgress(ElectionRequest electionRequest) {
@@ -1445,7 +1412,7 @@ public abstract class RaftMember {
* lastLogIndex is smaller than the voter's Otherwise accept the election.
*
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
- * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+ * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
*/
long checkLogProgress(long lastLogIndex, long lastLogTerm) {
long response;
@@ -1462,10 +1429,10 @@ public abstract class RaftMember {
/**
* Forward a non-query plan to a node using the default client.
*
- * @param plan a non-query plan
- * @param node cannot be the local node
+ * @param plan a non-query plan
+ * @param node cannot be the local node
* @param header must be set for data group communication, set to null for meta group
- * communication
+ * communication
* @return a TSStatus indicating if the forwarding is successful.
*/
public TSStatus forwardPlan(PhysicalPlan plan, Node node, RaftNode header) {
@@ -1496,7 +1463,7 @@ public abstract class RaftMember {
/**
* Forward a non-query plan to "receiver" using "client".
*
- * @param plan a non-query plan
+ * @param plan a non-query plan
* @param header to determine which DataGroupMember of "receiver" will process the request.
* @return a TSStatus indicating if the forwarding is successful.
*/
@@ -1578,7 +1545,7 @@ public abstract class RaftMember {
* Get an asynchronous thrift client of the given node.
*
* @return an asynchronous thrift client or null if the caller tries to connect the local node or
- * the node cannot be reached.
+ * the node cannot be reached.
*/
public AsyncClient getAsyncClient(Node node) {
return getAsyncClient(node, asyncClientPool, true);
@@ -1812,7 +1779,7 @@ public abstract class RaftMember {
* heartbeat timer.
*
* @param fromLeader true if the request is from a leader, false if the request is from an
- * elector.
+ * elector.
*/
public void stepDown(long newTerm, boolean fromLeader) {
synchronized (term) {
@@ -1844,9 +1811,7 @@ public abstract class RaftMember {
this.thisNode = thisNode;
}
- /**
- * @return the header of the data raft group or null if this is in a meta group.
- */
+ /** @return the header of the data raft group or null if this is in a meta group. */
public RaftNode getHeader() {
return null;
}
@@ -2005,9 +1970,7 @@ public abstract class RaftMember {
log, node, leaderShipStale, newLeaderTerm, request, quorumSize, Collections.emptyList());
}
- /**
- * Send "log" to "node".
- */
+ /** Send "log" to "node". */
public void sendLogToFollower(
VotingLog log,
Node node,
@@ -2140,7 +2103,7 @@ public abstract class RaftMember {
* and append "log" to it. Otherwise report a log mismatch.
*
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
protected AppendEntryResult appendEntry(
long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
@@ -2170,9 +2133,7 @@ public abstract class RaftMember {
return result;
}
- /**
- * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
- */
+ /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
private boolean waitForPrevLog(long prevLogIndex) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
@@ -2218,7 +2179,7 @@ public abstract class RaftMember {
*
* @param logs append logs
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
protected AppendEntryResult appendEntries(
long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {