You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/11/01 06:21:38 UTC
[iotdb] branch expr updated: before add log sequencer
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr by this push:
new f505f20 before add log sequencer
f505f20 is described below
commit f505f201b5b4d8a0906c2e1cfbda5e5da76ab855
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 14:21:07 2021 +0800
before add log sequencer
---
cluster/distribute-dc.sh | 2 +-
.../org/apache/iotdb/cluster/expr/ExprBench.java | 2 +
.../org/apache/iotdb/cluster/expr/ExprMember.java | 1 +
.../apache/iotdb/cluster/expr/SequencerExpr.java | 379 +++++++++++++++++++++
.../apache/iotdb/cluster/expr/VotingLogList.java | 32 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 3 +-
.../org/apache/iotdb/cluster/log/VotingLog.java | 2 +
.../cluster/partition/slot/SlotPartitionTable.java | 16 +-
.../handlers/caller/AppendNodeEntryHandler.java | 4 +
.../server/handlers/caller/ElectionHandler.java | 2 +-
.../server/heartbeat/MetaHeartbeatThread.java | 9 +-
.../cluster/server/member/DataGroupMember.java | 10 +-
.../cluster/server/member/MetaGroupMember.java | 33 +-
.../iotdb/cluster/server/member/RaftMember.java | 81 +++--
14 files changed, 516 insertions(+), 60 deletions(-)
diff --git a/cluster/distribute-dc.sh b/cluster/distribute-dc.sh
index 279f9ec..1eba6f4 100644
--- a/cluster/distribute-dc.sh
+++ b/cluster/distribute-dc.sh
@@ -1,6 +1,6 @@
src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
-ips=(dc15 dc16 dc17 dc18)
+ips=(dc11 dc12 dc13 dc14 dc15 dc16 dc17 dc18)
target_lib_path=/home/jt/iotdb_expr/lib
for ip in ${ips[*]}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index 86ec5e6..e3f7b57 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.expr;
import org.apache.iotdb.cluster.client.sync.SyncClientFactory;
import org.apache.iotdb.cluster.client.sync.SyncClientPool;
import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -107,6 +108,7 @@ public class ExprBench {
}
public static void main(String[] args) {
+ ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(50000);
Node target = new Node();
target.setInternalIp(args[0]);
target.setMetaPort(Integer.parseInt(args[1]));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 9e07028..050230b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -205,6 +205,7 @@ public class ExprMember extends MetaGroupMember {
// flush [0, flushPos)
List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
+ // logger.info("{}, Flushing {} into log manager", logManager.getLastLogIndex(), logs);
long success =
logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
if (success != -1) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java
new file mode 100644
index 0000000..7dae6a7
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.LogExecutionException;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogApplier;
+import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.LogParser;
+import org.apache.iotdb.cluster.log.VotingLog;
+import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.IOUtils;
+import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequencerExpr extends MetaGroupMember {
+
+ private static final Logger logger = LoggerFactory.getLogger(SequencerExpr.class);
+
+ private int v2ThreadNum = 2000;
+ private int v3ThreadNum = 0000;
+ private AtomicLong reqCnt = new AtomicLong();
+
+ private BlockingQueue<SendLogRequest> nonsequencedLogQueue = new ArrayBlockingQueue<>(
+ 4096);
+
+ public SequencerExpr() {
+ LogApplier applier = new LogApplier() {
+ @Override
+ public void apply(Log log) {
+ log.setApplied(true);
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ logManager = new MetaSingleSnapshotLogManager(applier, this);
+
+ new Thread(this::sequenceLog).start();
+ new Thread(this::sequenceLog).start();
+ new Thread(this::sequenceLog).start();
+ new Thread(this::sequenceLog).start();
+ reportThread = Executors.newSingleThreadScheduledExecutor();
+ reportThread.scheduleAtFixedRate(
+ this::generateNodeReport, REPORT_INTERVAL_SEC, REPORT_INTERVAL_SEC, TimeUnit.SECONDS);
+ }
+
+ private TSStatus processPlanLocallyV2(PhysicalPlan plan) {
+ logger.debug("{}: Processing plan {}", getName(), plan);
+ // assign term and index to the new log and append it
+ SendLogRequest sendLogRequest;
+
+ Log log;
+ if (plan instanceof LogPlan) {
+ try {
+ log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+ } catch (UnknownLogTypeException e) {
+ logger.error("Can not parse LogPlan {}", plan, e);
+ return StatusUtils.PARSE_LOG_ERROR;
+ }
+ } else {
+ log = new PhysicalPlanLog();
+ ((PhysicalPlanLog) log).setPlan(plan);
+ }
+
+ if (log.serialize().capacity() + Integer.BYTES
+ >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ logger.error(
+ "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ + "or reduce the size of requests you send.");
+ return StatusUtils.INTERNAL_ERROR;
+ }
+
+ long startTime =
+ Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
+ synchronized (logManager) {
+ Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
+ startTime);
+
+ plan.setIndex(logManager.getLastLogIndex() + 1);
+ log.setCurrLogTerm(getTerm().get());
+ log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+
+ startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
+ // just like processPlanLocally,we need to check the size of log
+
+ // logDispatcher will serialize log, and set log size, and we will use the size after it
+ logManager.append(log);
+ Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
+
+ startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
+ sendLogRequest = buildSendLogRequest(log);
+ Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
+
+ startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+ log.setCreateTime(System.nanoTime());
+ votingLogList.insert(sendLogRequest.getVotingLog());
+ getLogDispatcher().offer(sendLogRequest);
+ Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+ }
+
+ try {
+ AppendLogResult appendLogResult =
+ waitAppendResult(
+ sendLogRequest.getVotingLog(),
+ sendLogRequest.getLeaderShipStale(),
+ sendLogRequest.getNewLeaderTerm(),
+ sendLogRequest.getQuorumSize());
+ Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
+ sendLogRequest.getVotingLog().getLog().getCreateTime());
+
+ switch (appendLogResult) {
+ case WEAK_ACCEPT:
+ // TODO: change to weak
+ Statistic.RAFT_WEAK_ACCEPT.add(1);
+ return StatusUtils.OK;
+ case OK:
+ logger.debug(MSG_LOG_IS_ACCEPTED, getName(), log);
+ startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
+ commitLog(log);
+ Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
+ return StatusUtils.OK;
+ case TIME_OUT:
+ logger.debug("{}: log {} timed out...", getName(), log);
+ break;
+ case LEADERSHIP_STALE:
+ // abort the appending, the new leader will fix the local logs by catch-up
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ return handleLogExecutionException(log, IOUtils.getRootCause(e));
+ }
+ return StatusUtils.TIME_OUT;
+ }
+
+ public SendLogRequest enqueueSendLogRequest(Log log) {
+ VotingLog votingLog = buildVotingLog(log);
+ AtomicBoolean leaderShipStale = new AtomicBoolean(false);
+ AtomicLong newLeaderTerm = new AtomicLong(term.get());
+
+ SendLogRequest sendLogRequest = new SendLogRequest(
+ votingLog, leaderShipStale, newLeaderTerm, null, allNodes.size() / 2);
+ try {
+ nonsequencedLogQueue.put(sendLogRequest);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return sendLogRequest;
+ }
+
+ private void sequenceLog(List<SendLogRequest> sendLogRequests) {
+ long startTime;
+ synchronized (logManager) {
+ for (SendLogRequest sendLogRequest : sendLogRequests) {
+ Log log = sendLogRequest.getVotingLog().getLog();
+ log.setCurrLogTerm(getTerm().get());
+ log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+ if (log instanceof PhysicalPlanLog) {
+ ((PhysicalPlanLog) log).getPlan().setIndex(log.getCurrLogIndex());
+ }
+
+ startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
+ // just like processPlanLocally,we need to check the size of log
+
+ // logDispatcher will serialize log, and set log size, and we will use the size after it
+ logManager.append(log);
+ Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
+
+ AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(log, false);
+ sendLogRequest.setAppendEntryRequest(appendEntryRequest);
+
+ startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+ log.setCreateTime(System.nanoTime());
+ votingLogList.insert(sendLogRequest.getVotingLog());
+ getLogDispatcher().offer(sendLogRequest);
+ Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+ }
+ }
+ sendLogRequests.clear();
+ }
+
+ private void sequenceLog() {
+ List<SendLogRequest> sendLogRequests = new ArrayList<>();
+ while (!Thread.interrupted()) {
+ try {
+ synchronized (nonsequencedLogQueue) {
+ SendLogRequest request = nonsequencedLogQueue.take();
+ sendLogRequests.add(request);
+ nonsequencedLogQueue.drainTo(sendLogRequests);
+ }
+
+ sequenceLog(sendLogRequests);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+
+ private TSStatus processPlanLocallyV3(PhysicalPlan plan) {
+ logger.debug("{}: Processing plan {}", getName(), plan);
+ // assign term and index to the new log and append it
+ SendLogRequest sendLogRequest;
+
+ Log log;
+ if (plan instanceof LogPlan) {
+ try {
+ log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+ } catch (UnknownLogTypeException e) {
+ logger.error("Can not parse LogPlan {}", plan, e);
+ return StatusUtils.PARSE_LOG_ERROR;
+ }
+ } else {
+ log = new PhysicalPlanLog();
+ ((PhysicalPlanLog) log).setPlan(plan);
+ }
+
+ if (log.serialize().capacity() + Integer.BYTES
+ >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ logger.error(
+ "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ + "or reduce the size of requests you send.");
+ return StatusUtils.INTERNAL_ERROR;
+ }
+
+ long startTime;
+ sendLogRequest = enqueueSendLogRequest(log);
+
+ try {
+ AppendLogResult appendLogResult =
+ waitAppendResult(
+ sendLogRequest.getVotingLog(),
+ sendLogRequest.getLeaderShipStale(),
+ sendLogRequest.getNewLeaderTerm(),
+ sendLogRequest.getQuorumSize());
+ Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
+ sendLogRequest.getVotingLog().getLog().getCreateTime());
+
+ switch (appendLogResult) {
+ case WEAK_ACCEPT:
+ // TODO: change to weak
+ Statistic.RAFT_WEAK_ACCEPT.add(1);
+ return StatusUtils.OK;
+ case OK:
+ logger.debug(MSG_LOG_IS_ACCEPTED, getName(), log);
+ startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
+ commitLog(log);
+ Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
+ return StatusUtils.OK;
+ case TIME_OUT:
+ logger.debug("{}: log {} timed out...", getName(), log);
+ break;
+ case LEADERSHIP_STALE:
+ // abort the appending, the new leader will fix the local logs by catch-up
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ return handleLogExecutionException(log, IOUtils.getRootCause(e));
+ }
+ return StatusUtils.TIME_OUT;
+ }
+
+ @Override
+ public Client getSyncClient(Node node) {
+ return new Client(null, null) {
+ @Override
+ public AppendEntryResult appendEntry(AppendEntryRequest request) {
+ return new AppendEntryResult().setStatus(Response.RESPONSE_STRONG_ACCEPT);
+ }
+
+ @Override
+ public AppendEntryResult appendEntries(AppendEntriesRequest request) {
+ return new AppendEntryResult().setStatus(Response.RESPONSE_STRONG_ACCEPT);
+ }
+ };
+ }
+
+ private void decentralizedSequencing() {
+ for (int i = 0; i < v2ThreadNum; i++) {
+ new Thread(() -> {
+ while (true) {
+ reqCnt.incrementAndGet();
+ DummyPlan dummyPlan = new DummyPlan();
+ processPlanLocallyV2(dummyPlan);
+ }
+ }).start();
+ }
+ }
+
+ private void centralizedSequencing() {
+ for (int i = 0; i < v3ThreadNum; i++) {
+ new Thread(() -> {
+ while (true) {
+ reqCnt.incrementAndGet();
+ DummyPlan dummyPlan = new DummyPlan();
+ processPlanLocallyV3(dummyPlan);
+ }
+ }).start();
+ }
+ }
+
+ private void startMonitor() {
+ new Thread(() -> {
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ long consumedTime = System.currentTimeMillis() - startTime;
+ System.out.println("" + consumedTime + ", " + (reqCnt.get() * 1.0 / consumedTime * 1000L));
+ }
+ }).start();
+ }
+
+ public static void main(String[] args) {
+ RaftMember.USE_LOG_DISPATCHER = true;
+ ClusterDescriptor.getInstance().getConfig().setEnableRaftLogPersistence(false);
+ SequencerExpr sequencerExpr = new SequencerExpr();
+ sequencerExpr.setCharacter(NodeCharacter.LEADER);
+ PartitionGroup group = new PartitionGroup();
+ for (int i = 0; i < 3; i++) {
+ group.add(new Node().setNodeIdentifier(i).setMetaPort(i));
+ }
+ sequencerExpr.setAllNodes(group);
+ sequencerExpr.centralizedSequencing();
+ sequencerExpr.decentralizedSequencing();
+ sequencerExpr.startMonitor();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 149e726..28eb19d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -21,10 +21,14 @@ package org.apache.iotdb.cluster.expr;
import org.apache.iotdb.cluster.log.VotingLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.List;
public class VotingLogList {
+ private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
private List<VotingLog> logList = new ArrayList<>();
private volatile long currTerm = -1;
@@ -58,28 +62,36 @@ public class VotingLogList {
* @param acceptingNodeId
* @return the lastly removed entry if any.
*/
- public synchronized void onStronglyAccept(long index, long term, int acceptingNodeId) {
+ public void onStronglyAccept(long index, long term, int acceptingNodeId) {
int lastEntryIndexToCommit = -1;
- for (int i = 0, logListSize = logList.size(); i < logListSize; i++) {
- VotingLog votingLog = logList.get(i);
- if (votingLog.getLog().getCurrLogIndex() <= index
- && votingLog.getLog().getCurrLogTerm() == term) {
- votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
- if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
- lastEntryIndexToCommit = i;
+
+ List<VotingLog> acceptedLogs;
+ synchronized (this) {
+ for (int i = 0, logListSize = logList.size(); i < logListSize; i++) {
+ VotingLog votingLog = logList.get(i);
+ if (votingLog.getLog().getCurrLogIndex() <= index
+ && votingLog.getLog().getCurrLogTerm() == term) {
+ votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
+ if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
+ lastEntryIndexToCommit = i;
+ }
+ } else if (votingLog.getLog().getCurrLogIndex() > index) {
+ break;
}
}
+
+ List<VotingLog> tmpAcceptedLogs = logList.subList(0, lastEntryIndexToCommit + 1);
+ acceptedLogs = new ArrayList<>(tmpAcceptedLogs);
+ tmpAcceptedLogs.clear();
}
if (lastEntryIndexToCommit != -1) {
- List<VotingLog> acceptedLogs = logList.subList(0, lastEntryIndexToCommit + 1);
for (VotingLog acceptedLog : acceptedLogs) {
synchronized (acceptedLog) {
acceptedLog.acceptedTime = System.nanoTime();
acceptedLog.notifyAll();
}
}
- acceptedLogs.clear();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index d954ccf..6c3d2073 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.TestOnly;
@@ -83,7 +84,7 @@ public class LogDispatcher {
void createQueueAndBindingThreads() {
for (Node node : member.getAllNodes()) {
- if (!node.equals(member.getThisNode())) {
+ if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
nodeLogQueues.add(createQueueAndBindingThread(node));
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index 555562a..d56f3f0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.log;
+import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
@@ -27,6 +28,7 @@ public class VotingLog {
protected Set<Integer> stronglyAcceptedNodeIds;
protected Set<Integer> weaklyAcceptedNodeIds;
public long acceptedTime;
+ public volatile ByteBuffer serializedCache;
public VotingLog(Log log, int groupSize) {
this.log = log;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 7a46baa..fcb70cb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -190,7 +190,7 @@ public class SlotPartitionTable implements PartitionTable {
private List<PartitionGroup> getPartitionGroups(Node node) {
List<PartitionGroup> ret = new ArrayList<>();
- int nodeIndex = nodeRing.indexOf(node);
+ int nodeIndex = findNodeIndex(node);
if (nodeIndex == -1) {
logger.info("PartitionGroups is empty due to this node has been removed from the cluster!");
return ret;
@@ -215,7 +215,7 @@ public class SlotPartitionTable implements PartitionTable {
PartitionGroup ret = new PartitionGroup(raftNode.getRaftId());
// assuming the nodes are [1,2,3,4,5]
- int nodeIndex = nodeRing.indexOf(raftNode.getNode());
+ int nodeIndex = findNodeIndex(raftNode.getNode());
if (nodeIndex == -1) {
logger.warn("Node {} is not in the cluster", raftNode.getNode());
return null;
@@ -529,7 +529,7 @@ public class SlotPartitionTable implements PartitionTable {
localGroups.remove(removedGroupIdx);
// each node exactly joins replicationNum groups, so when a group is removed, the node
// should join a new one
- int thisNodeIdx = nodeRing.indexOf(thisNode);
+ int thisNodeIdx = findNodeIndex(thisNode);
// check if this node is to be removed
if (thisNodeIdx == -1) {
@@ -606,4 +606,14 @@ public class SlotPartitionTable implements PartitionTable {
public RaftNode[] getSlotNodes() {
return slotNodes;
}
+
+ private int findNodeIndex(Node node) {
+ for (int i = 0; i < nodeRing.size(); i++) {
+ if (nodeRing.get(i).getInternalIp().equals(node.getInternalIp())
+ && nodeRing.get(i).getMetaPort() == node.getMetaPort()) {
+ return i;
+ }
+ }
+ return -1;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index f865ec8..73088d2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -114,6 +114,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
}
} else if (resp == RESPONSE_WEAK_ACCEPT) {
synchronized (log) {
+ if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
+ >= quorumSize) {
+ log.acceptedTime = System.nanoTime();
+ }
log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
log.notifyAll();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java
index 6190d20..cbb5a08 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java
@@ -96,8 +96,8 @@ public class ElectionHandler implements AsyncMethodCallback<Long> {
// the election is valid
electionValid.set(true);
terminated.set(true);
- raftMember.getTerm().notifyAll();
raftMember.onElectionWins();
+ raftMember.getTerm().notifyAll();
logger.info("{}: Election {} is won", memberName, currTerm);
}
// still need more votes
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index 3eb510d..cb28a64 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -86,6 +86,7 @@ public class MetaHeartbeatThread extends HeartbeatThread {
super.startElection();
if (localMetaMember.getCharacter() == NodeCharacter.LEADER) {
+
// A new raft leader needs to have at least one log in its term for committing logs with older
// terms.
// In the meta group, log frequency is very low. When the leader is changed whiling changing
@@ -93,7 +94,13 @@ public class MetaHeartbeatThread extends HeartbeatThread {
// operation can be carried out in time.
localMetaMember
.getAppendLogThreadPool()
- .submit(() -> localMetaMember.processPlanLocally(new DummyPlan()));
+ .submit(
+ () -> {
+ while (localMetaMember.getPartitionTable() == null) {
+ // wait until partition table is ready
+ }
+ localMetaMember.processPlanLocally(new DummyPlan());
+ });
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index e58d148..eadc017 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -266,7 +266,7 @@ public class DataGroupMember extends RaftMember {
long checkElectorLogProgress(ElectionRequest electionRequest) {
Node elector = electionRequest.getElector();
// check if the node is in the group
- if (!allNodes.contains(elector)) {
+ if (!containsNode(elector)) {
logger.info(
"{}: the elector {} is not in the data group {}, so reject this election.",
name,
@@ -316,7 +316,7 @@ public class DataGroupMember extends RaftMember {
logger.debug("{}: start to pre adding node {}", name, node);
}
synchronized (allNodes) {
- if (allNodes.contains(node)) {
+ if (containsNode(node)) {
return false;
}
int insertIndex = -1;
@@ -367,7 +367,7 @@ public class DataGroupMember extends RaftMember {
synchronized (allNodes) {
preAddNode(node);
- if (allNodes.contains(node) && allNodes.size() > config.getReplicationNum()) {
+ if (containsNode(node) && allNodes.size() > config.getReplicationNum()) {
// remove the last node because the group size is fixed to replication number
Node removedNode = allNodes.remove(allNodes.size() - 1);
peerMap.remove(removedNode);
@@ -895,7 +895,7 @@ public class DataGroupMember extends RaftMember {
logger.debug("{}: start to pre remove node {}", name, removedNode);
}
synchronized (allNodes) {
- if (allNodes.contains(removedNode) && allNodes.size() == config.getReplicationNum()) {
+ if (containsNode(removedNode) && allNodes.size() == config.getReplicationNum()) {
// update the group if the deleted node was in it
PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
if (newGroup == null) {
@@ -933,7 +933,7 @@ public class DataGroupMember extends RaftMember {
synchronized (allNodes) {
preRemoveNode(removedNode);
- if (allNodes.contains(removedNode)) {
+ if (containsNode(removedNode)) {
// update the group if the deleted node was in it
allNodes.remove(removedNode);
peerMap.remove(removedNode);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7919795..e30f867 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -169,7 +169,7 @@ public class MetaGroupMember extends RaftMember {
* every "REPORT_INTERVAL_SEC" seconds, a reporter thread will print the status of all raft
* members in this node
*/
- private static final int REPORT_INTERVAL_SEC = 10;
+ protected static final int REPORT_INTERVAL_SEC = 10;
/**
* during snapshot, hardlinks of data files are created to for downloading. hardlinks will be
@@ -219,7 +219,7 @@ public class MetaGroupMember extends RaftMember {
* a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread" will print the status
* of all raft members in this node
*/
- private ScheduledExecutorService reportThread;
+ protected ScheduledExecutorService reportThread;
/**
* containing configurations that should be kept the same cluster-wide, and must be checked before
@@ -397,7 +397,7 @@ public class MetaGroupMember extends RaftMember {
if (node != null
&& (!node.getInternalIp().equals(thisNode.internalIp)
|| node.getMetaPort() != thisNode.getMetaPort())
- && !allNodes.contains(node)) {
+ && !containsNode(node)) {
// do not add the local node since it is added in the constructor
allNodes.add(node);
}
@@ -417,7 +417,7 @@ public class MetaGroupMember extends RaftMember {
logger.debug("{}: adding a new node {} into {}", name, newNode, allNodes);
}
- if (!allNodes.contains(newNode)) {
+ if (!containsNode(newNode)) {
registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
allNodes.add(newNode);
}
@@ -473,7 +473,7 @@ public class MetaGroupMember extends RaftMember {
TimeUnit.SECONDS);
}
- private void generateNodeReport() {
+ protected void generateNodeReport() {
try {
if (logger.isInfoEnabled()) {
NodeReport report = genNodeReport();
@@ -619,7 +619,7 @@ public class MetaGroupMember extends RaftMember {
long checkElectorLogProgress(ElectionRequest electionRequest) {
Node elector = electionRequest.getElector();
// check if the node is in the group
- if (partitionTable != null && !allNodes.contains(elector)) {
+ if (partitionTable != null && !containsNode(elector)) {
logger.info(
"{}: the elector {} is not in the data group {}, so reject this election.",
name,
@@ -723,7 +723,20 @@ public class MetaGroupMember extends RaftMember {
if (response.isSetFollowerIdentifier()) {
// register the follower, the response.getFollower() contains the node information of the
// receiver.
- registerNodeIdentifier(response.getFollower(), response.getFollowerIdentifier());
+ Node localNode = null;
+ for (Node node : allNodes) {
+ if (node.getInternalIp().equals(response.getFollower().internalIp)
+ && node.getMetaPort() == response.getFollower().getMetaPort()) {
+ localNode = node;
+ }
+ }
+ if (localNode == null) {
+ logger.warn(
+ "Received a heartbeat response from a node that is not in the node list: {}",
+ response.getFollower());
+ return;
+ }
+ registerNodeIdentifier(localNode, response.getFollowerIdentifier());
// if all nodes' ids are known, we can build the partition table
if (allNodesIdKnown()) {
// When the meta raft group is established, the follower reports its node information to the
@@ -902,7 +915,7 @@ public class MetaGroupMember extends RaftMember {
break;
}
}
- if (allNodes.contains(newNode)) {
+ if (containsNode(newNode)) {
logger.debug("Node {} is already in the cluster", newNode);
response.setRespNum((int) Response.RESPONSE_AGREE);
synchronized (partitionTable) {
@@ -1741,7 +1754,7 @@ public class MetaGroupMember extends RaftMember {
logger.debug("{}: Removing a node {} from {}", name, oldNode, allNodes);
}
- if (allNodes.contains(oldNode)) {
+ if (containsNode(oldNode)) {
allNodes.remove(oldNode);
idNodeMap.remove(oldNode.nodeIdentifier);
}
@@ -1857,7 +1870,7 @@ public class MetaGroupMember extends RaftMember {
private NodeReport genNodeReport() {
NodeReport report = new NodeReport(thisNode);
report.setMetaMemberReport(genMemberReport());
- report.setDataMemberReportList(dataClusterServer.genMemberReports());
+ // report.setDataMemberReportList(dataClusterServer.genMemberReports());
return report;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 564eb98..41b5e50 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.cluster.utils.StatusUtils;
@@ -168,7 +169,7 @@ public abstract class RaftMember {
* the current term of the node, this object also works as lock of some transactions of the member
* like elections.
*/
- AtomicLong term = new AtomicLong(0);
+ protected AtomicLong term = new AtomicLong(0);
volatile NodeCharacter character = NodeCharacter.ELECTOR;
AtomicReference<Node> leader = new AtomicReference<>(ClusterConstant.EMPTY_NODE);
@@ -787,6 +788,19 @@ public abstract class RaftMember {
public void setAllNodes(PartitionGroup allNodes) {
this.allNodes = allNodes;
this.votingLogList = new VotingLogList(allNodes.size() / 2);
+
+ // update the reference of thisNode to keep consistency
+ boolean foundThisNode = false;
+ for (Node node : allNodes) {
+ if (ClusterUtils.isNodeEquals(node, thisNode)) {
+ thisNode = node;
+ foundThisNode = true;
+ break;
+ }
+ }
+ if (!foundThisNode) {
+ logger.error("{}: did not find this node {}, in the raft group {}", name, thisNode, allNodes);
+ }
}
public Map<Node, Long> getLastCatchUpResponseTime() {
@@ -1192,10 +1206,9 @@ public abstract class RaftMember {
startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
log.setCreateTime(System.nanoTime());
+ votingLogList.insert(sendLogRequest.getVotingLog());
getLogDispatcher().offer(sendLogRequest);
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
-
- votingLogList.insert(sendLogRequest.getVotingLog());
}
try {
@@ -1586,7 +1599,7 @@ public abstract class RaftMember {
return term;
}
- private synchronized LogDispatcher getLogDispatcher() {
+ protected synchronized LogDispatcher getLogDispatcher() {
if (logDispatcher == null) {
if (USE_INDIRECT_LOG_DISPATCHER) {
logDispatcher = new IndirectLogDispatcher(this);
@@ -1602,23 +1615,24 @@ public abstract class RaftMember {
* one follower tells the node that it is no longer a valid leader, or a timeout is triggered.
*/
@SuppressWarnings({"java:S2445"}) // safe synchronized
- private AppendLogResult waitAppendResult(
+ protected AppendLogResult waitAppendResult(
VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, int quorumSize) {
// wait for the followers to vote
long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
long nextTimeToPrint = 3000;
+
+ int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+ int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+ int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
synchronized (log) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
- int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
- int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
while (stronglyAcceptedNodeNum < quorumSize
+ && (!ENABLE_WEAK_ACCEPTANCE || (totalAccepted < quorumSize))
&& alreadyWait < RaftServer.getWriteOperationTimeoutMS()
- && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)
- && (!ENABLE_WEAK_ACCEPTANCE
- || (stronglyAcceptedNodeNum + weaklyAcceptedNodeNum < quorumSize))) {
+ && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
try {
- log.wait(1);
+ log.wait(0);
logger.debug("{} ends waiting", log);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -1636,6 +1650,7 @@ public abstract class RaftMember {
}
stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+ totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
}
if (alreadyWait > 3000) {
@@ -1663,15 +1678,11 @@ public abstract class RaftMember {
}
// cannot get enough agreements within a certain amount of time
- if (log.getStronglyAcceptedNodeIds().size() < quorumSize
- && (log.getStronglyAcceptedNodeIds().size() + log.getWeaklyAcceptedNodeIds().size())
- < quorumSize) {
+ if (stronglyAcceptedNodeNum < quorumSize && totalAccepted < quorumSize) {
return AppendLogResult.TIME_OUT;
}
- if (log.getStronglyAcceptedNodeIds().size() < quorumSize
- && (log.getStronglyAcceptedNodeIds().size() + log.getWeaklyAcceptedNodeIds().size())
- >= quorumSize) {
+ if (stronglyAcceptedNodeNum < quorumSize && totalAccepted >= quorumSize) {
return AppendLogResult.WEAK_ACCEPT;
}
@@ -1680,17 +1691,21 @@ public abstract class RaftMember {
}
@SuppressWarnings("java:S2445")
- void commitLog(Log log) throws LogExecutionException {
+ protected void commitLog(Log log) throws LogExecutionException {
long startTime =
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
- synchronized (logManager) {
- Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
- startTime);
-
- startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
- logManager.commitTo(log.getCurrLogIndex());
+ if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+ synchronized (logManager) {
+ if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+ Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+ startTime);
+
+ startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
+ logManager.commitTo(log.getCurrLogIndex());
+ }
+ }
+ Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
}
- Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
if (ENABLE_COMMIT_RETURN) {
return;
}
@@ -1701,7 +1716,7 @@ public abstract class RaftMember {
while (!log.isApplied()) {
// wait until the log is applied
try {
- log.wait(5);
+ log.wait(0);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LogExecutionException(e);
@@ -1742,7 +1757,7 @@ public abstract class RaftMember {
return tsStatus;
}
- AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
+ protected AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
AppendEntryRequest request = new AppendEntryRequest();
request.setTerm(term.get());
if (serializeNow) {
@@ -2249,7 +2264,7 @@ public abstract class RaftMember {
return allNodes.getId();
}
- enum AppendLogResult {
+ protected enum AppendLogResult {
OK,
TIME_OUT,
LEADERSHIP_STALE,
@@ -2293,4 +2308,14 @@ public abstract class RaftMember {
public void setSkipElection(boolean skipElection) {
this.skipElection = skipElection;
}
+
+ protected boolean containsNode(Node node) {
+ for (Node localNode : allNodes) {
+ if ((localNode.getInternalIp().equals(node.getInternalIp())
+ && localNode.getMetaPort() == node.getMetaPort())) {
+ return true;
+ }
+ }
+ return false;
+ }
}