You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/04/07 01:16:56 UTC
[iotdb] 01/03: fix ack leader with wrong receiver
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1bac74e9ae4ca34a8cf49f86ca260e6e95e1151d
Author: jt <jt...@163.com>
AuthorDate: Tue Mar 29 11:28:17 2022 +0800
fix ack leader with wrong receiver
---
cluster/distribute-dc.sh | 2 +-
.../resources/conf/iotdb-cluster.properties | 3 +-
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 19 +++
.../cluster/client/async/AsyncDataClient.java | 4 -
.../apache/iotdb/cluster/config/ClusterConfig.java | 10 ++
.../iotdb/cluster/config/ClusterDescriptor.java | 6 +
.../org/apache/iotdb/cluster/expr/ExprBench.java | 128 +++++++++++---
.../iotdb/cluster/log/IndirectLogDispatcher.java | 137 ++++++++++-----
.../java/org/apache/iotdb/cluster/log/Log.java | 2 +-
.../org/apache/iotdb/cluster/log/LogAckSender.java | 186 +++++++++++++++++++++
.../apache/iotdb/cluster/log/LogDispatcher.java | 65 +++++--
.../org/apache/iotdb/cluster/log/LogRelay.java | 51 +++++-
.../apache/iotdb/cluster/log/VotingLogList.java | 16 ++
.../cluster/log/appender/BlockingLogAppender.java | 47 ++++--
.../iotdb/cluster/log/appender/LogAppender.java | 7 +-
.../log/appender/SlidingWindowLogAppender.java | 88 ++++++++--
.../cluster/log/logtypes/PhysicalPlanLog.java | 11 ++
.../iotdb/cluster/log/manage/RaftLogManager.java | 17 +-
.../cluster/query/manage/QueryCoordinator.java | 3 +-
.../handlers/caller/AppendNodeEntryHandler.java | 55 +++---
.../server/handlers/caller/HeartbeatHandler.java | 16 +-
.../cluster/server/heartbeat/HeartbeatThread.java | 18 +-
.../cluster/server/member/DataGroupMember.java | 3 +-
.../cluster/server/member/MetaGroupMember.java | 3 +-
.../iotdb/cluster/server/member/RaftMember.java | 174 +++++++++----------
.../iotdb/cluster/server/monitor/NodeReport.java | 20 ++-
.../iotdb/cluster/server/monitor/NodeStatus.java | 34 ++++
.../cluster/server/monitor/NodeStatusManager.java | 9 +-
.../apache/iotdb/cluster/server/monitor/Timer.java | 64 ++++++-
.../cluster/server/service/MetaSyncService.java | 5 +-
.../apache/iotdb/cluster/utils/WeightedList.java | 87 ++++++++++
.../caller/AppendNodeEntryHandlerTest.java | 12 +-
thrift-cluster/src/main/thrift/cluster.thrift | 3 +-
33 files changed, 1018 insertions(+), 287 deletions(-)
diff --git a/cluster/distribute-dc.sh b/cluster/distribute-dc.sh
index 1eba6f4264..5e1af6b312 100644
--- a/cluster/distribute-dc.sh
+++ b/cluster/distribute-dc.sh
@@ -1,6 +1,6 @@
src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
-ips=(dc11 dc12 dc13 dc14 dc15 dc16 dc17 dc18)
+ips=(dc13 dc14 dc15 dc16 dc17 dc18)
target_lib_path=/home/jt/iotdb_expr/lib
for ip in ${ips[*]}
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index ff363749b4..56566c327e 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -212,4 +212,5 @@ multi_raft_factor=1
# replicas is high, which may make thread switching costly.
# dispatcher_binding_thread_num=16
-use_indirect_broadcasting=true
\ No newline at end of file
+use_indirect_broadcasting=true
+optimize_indirect_broadcasting=false
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 36c40d04bc..2e5b02cc58 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -45,6 +45,9 @@ import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
import org.apache.iotdb.cluster.server.raft.DataRaftService;
import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
@@ -79,6 +82,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -250,6 +254,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
}
// we start IoTDB kernel first. then we start the cluster module.
+ Runtime.getRuntime().addShutdownHook(new ShutdownHook());
if (MODE_START.equals(mode)) {
cluster.activeStartNodeMode();
} else if (MODE_ADD.equals(mode)) {
@@ -287,6 +292,20 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
return true;
}
+ private static class ShutdownHook extends Thread {
+
+ @Override
+ public void run() {
+ logger.info(
+ "Total request fanout: {}",
+ Statistic.RAFT_SENDER_RELAY_LOG.getCnt() + Statistic.RAFT_SENDER_SEND_LOG.getCnt());
+ for (Entry<Node, NodeStatus> nodeNodeStatusEntry :
+ NodeStatusManager.getINSTANCE().getNodeStatusMap().entrySet()) {
+ logger.info("{}: {}", nodeNodeStatusEntry.getKey(), nodeNodeStatusEntry.getValue());
+ }
+ }
+ }
+
private String clusterConfigCheck() {
if (IoTDBDescriptor.getInstance().getConfig().isReplaceHostNameWithIp()) {
try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
index df6d5ead74..cb9c798b68 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
@@ -147,17 +147,13 @@ public class AsyncDataClient extends TSDataService.AsyncClient {
public static class AsyncDataClientFactory extends AsyncBaseFactory<Node, AsyncDataClient> {
- Exception createStack;
-
public AsyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
super(protocolFactory, category);
- createStack = new Exception();
}
public AsyncDataClientFactory(
TProtocolFactory protocolFactory, ClientCategory category, IClientManager clientManager) {
super(protocolFactory, category, clientManager);
- createStack = new Exception();
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index c75237beca..187509b2ef 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -192,6 +192,8 @@ public class ClusterConfig {
private int relaySenderNum = 8;
+ private boolean optimizeIndirectBroadcasting = false;
+
/**
* create a clusterConfig class. The internalIP will be set according to the server's hostname. If
* there is something error for getting the ip of the hostname, then set the internalIp as
@@ -590,4 +592,12 @@ public class ClusterConfig {
public void setRelaySenderNum(int relaySenderNum) {
this.relaySenderNum = relaySenderNum;
}
+
+ public boolean isOptimizeIndirectBroadcasting() {
+ return optimizeIndirectBroadcasting;
+ }
+
+ public void setOptimizeIndirectBroadcasting(boolean optimizeIndirectBroadcasting) {
+ this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index c8bfbdc51f..a089e2d75a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -345,6 +345,12 @@ public class ClusterDescriptor {
properties.getProperty(
"use_indirect_broadcasting", String.valueOf(config.isUseIndirectBroadcasting()))));
+ config.setOptimizeIndirectBroadcasting(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "optimize_indirect_broadcasting",
+ String.valueOf(config.isOptimizeIndirectBroadcasting()))));
+
config.setRelaySenderNum(
Integer.parseInt(
properties.getProperty(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index 7673e116c7..d94957f74d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -28,20 +28,28 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.Random;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class ExprBench {
+ private static final Logger logger = LoggerFactory.getLogger(ExprBench.class);
+
private AtomicLong requestCounter = new AtomicLong();
private AtomicLong latencySum = new AtomicLong();
private long maxLatency = 0;
@@ -49,45 +57,86 @@ public class ExprBench {
private int workloadSize = 64 * 1024;
private int printInterval = 1000;
private ClientManager clientPool;
- private Node target;
private int maxRequestNum;
private ExecutorService pool = Executors.newCachedThreadPool();
private List<Node> nodeList = new ArrayList<>();
- private int raftFactor = 1;
+ private int[] raftFactors;
+ private int[] rateLimits;
+ private List<EndPoint> endPoints = new ArrayList<>();
+ private Map<EndPoint, RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
+ private Map<EndPoint, Statistic> latencyMap = new ConcurrentHashMap<>();
public ExprBench(Node target) {
- this.target = target;
clientPool = new ClientManager(false, Type.MetaGroupClient);
}
+ private static class EndPoint {
+ private Node node;
+ private int raftId;
+
+ public EndPoint(Node node, int raftId) {
+ this.node = node;
+ this.raftId = raftId;
+ }
+
+ @Override
+ public String toString() {
+ return "EndPoint{" + "node=" + node.getInternalIp() + ", raftId=" + raftId + '}';
+ }
+ }
+
+ private static class Statistic {
+ private AtomicLong sum = new AtomicLong();
+ private AtomicLong cnt = new AtomicLong();
+
+ public void add(long val) {
+ sum.addAndGet(val);
+ cnt.incrementAndGet();
+ }
+
+ @Override
+ public String toString() {
+ return "{" + sum.get() + "," + cnt.get() + "," + (sum.get() * 1.0 / cnt.get()) + "}";
+ }
+ }
+
public void benchmark() {
long startTime = System.currentTimeMillis();
for (int i = 0; i < threadNum; i++) {
int finalI = i;
pool.submit(
() -> {
- Random random = new Random(123456L + finalI);
+ int endPointIdx = finalI % endPoints.size();
Client client = null;
- try {
- client = clientPool.borrowSyncClient(target, ClientCategory.META);
- } catch (IOException e) {
- e.printStackTrace();
- }
+
ExecutNonQueryReq request = new ExecutNonQueryReq();
DummyPlan plan = new DummyPlan();
plan.setWorkload(new byte[workloadSize]);
plan.setNeedForward(true);
ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
+ Map<EndPoint, Node> endPointLeaderMap = new HashMap<>();
+ Node target = null;
long currRequsetNum = -1;
while (true) {
- if (raftFactor > 0) {
- Node node = nodeList.get(random.nextInt(nodeList.size()));
- int raftId = random.nextInt(raftFactor);
- plan.setGroupIdentifier(ClusterUtils.nodeToString(node) + "#" + raftId);
+ EndPoint endPoint = endPoints.get(endPointIdx);
+ RateLimiter rateLimiter = rateLimiterMap.get(endPoint);
+ if (rateLimiter != null) {
+ rateLimiter.acquire(1);
}
+
+ target = endPointLeaderMap.getOrDefault(endPoint, endPoint.node);
+ int raftId = endPoint.raftId;
+ plan.setGroupIdentifier(ClusterUtils.nodeToString(endPoint.node) + "#" + raftId);
+
+ try {
+ client = clientPool.borrowSyncClient(target, ClientCategory.META);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
byteBuffer.clear();
plan.serialize(byteBuffer);
byteBuffer.flip();
@@ -96,12 +145,20 @@ public class ExprBench {
long reqLatency = System.nanoTime();
try {
- client.executeNonQueryPlan(request);
+ TSStatus status = client.executeNonQueryPlan(request);
+ clientPool.returnSyncClient(client, target, ClientCategory.META);
+ if (status.isSetRedirectNode()) {
+ Node leader = new Node().setInternalIp(status.redirectNode.ip).setMetaPort(8880);
+ endPointLeaderMap.put(endPoint, leader);
+ logger.info("Leader of {} is changed to {}", endPoint, leader);
+ }
+
currRequsetNum = requestCounter.incrementAndGet();
if (currRequsetNum > threadNum * 10) {
reqLatency = System.nanoTime() - reqLatency;
maxLatency = Math.max(maxLatency, reqLatency);
latencySum.addAndGet(reqLatency);
+ latencyMap.get(endPoint).add(reqLatency);
}
} catch (TException e) {
e.printStackTrace();
@@ -118,6 +175,7 @@ public class ExprBench {
currRequsetNum * workloadSize / (1024.0 * 1024.0) / elapsedTime,
maxLatency / 1000.0,
(latencySum.get() + 0.0) / currRequsetNum));
+ System.out.println(latencyMap);
}
if (currRequsetNum >= maxRequestNum) {
@@ -136,14 +194,12 @@ public class ExprBench {
public static void main(String[] args) {
ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(50000);
Node target = new Node();
- target.setInternalIp(args[0]);
- target.setMetaPort(Integer.parseInt(args[1]));
ExprBench bench = new ExprBench(target);
- bench.maxRequestNum = Integer.parseInt(args[2]);
- bench.threadNum = Integer.parseInt(args[3]);
- bench.workloadSize = Integer.parseInt(args[4]) * 1024;
- bench.printInterval = Integer.parseInt(args[5]);
- String[] nodesSplit = args[6].split(",");
+ bench.maxRequestNum = Integer.parseInt(args[0]);
+ bench.threadNum = Integer.parseInt(args[1]);
+ bench.workloadSize = Integer.parseInt(args[2]) * 1024;
+ bench.printInterval = Integer.parseInt(args[3]);
+ String[] nodesSplit = args[4].split(",");
for (String s : nodesSplit) {
String[] nodeSplit = s.split(":");
Node node = new Node();
@@ -151,8 +207,34 @@ public class ExprBench {
node.setMetaPort(Integer.parseInt(nodeSplit[1]));
bench.nodeList.add(node);
}
- bench.raftFactor = Integer.parseInt(args[7]);
+ String[] raftFactorSplit = args[5].split(",");
+ bench.raftFactors = new int[raftFactorSplit.length];
+ for (int i = 0; i < raftFactorSplit.length; i++) {
+ bench.raftFactors[i] = Integer.parseInt(raftFactorSplit[i]);
+ }
+ if (args.length >= 7) {
+ String[] ratesSplit = args[6].split(",");
+ bench.rateLimits = new int[ratesSplit.length];
+ for (int i = 0; i < ratesSplit.length; i++) {
+ bench.rateLimits[i] = Integer.parseInt(ratesSplit[i]);
+ }
+ }
+
+ List<Node> list = bench.nodeList;
+ for (int i = 0, listSize = list.size(); i < listSize; i++) {
+ Node node = list.get(i);
+ for (int j = 0; j < bench.raftFactors[i]; j++) {
+ EndPoint endPoint = new EndPoint(node, j);
+ bench.endPoints.add(endPoint);
+ bench.latencyMap.put(endPoint, new Statistic());
+ if (bench.rateLimits != null) {
+ bench.rateLimiterMap.put(endPoint, RateLimiter.create(bench.rateLimits[i]));
+ }
+ }
+ }
bench.benchmark();
+
+ System.out.println(bench.latencyMap);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 343cb3d21f..c02f911778 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,24 +19,25 @@
package org.apache.iotdb.cluster.log;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.WeightedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
/**
* IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all
@@ -45,10 +46,13 @@ import java.util.concurrent.TimeUnit;
public class IndirectLogDispatcher extends LogDispatcher {
private static final Logger logger = LoggerFactory.getLogger(IndirectLogDispatcher.class);
- private Map<Node, List<Node>> directToIndirectFollowerMap;
+
+ private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>();
public IndirectLogDispatcher(RaftMember member) {
super(member);
+ recalculateDirectFollowerMap();
+ useBatchInLogCatchUp = false;
}
@Override
@@ -59,25 +63,101 @@ public class IndirectLogDispatcher extends LogDispatcher {
@Override
void createQueueAndBindingThreads() {
+ for (Node node : member.getAllNodes()) {
+ if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
+ nodesEnabled.put(node, false);
+ nodesLogQueues.put(node, createQueueAndBindingThread(node));
+ }
+ }
+ }
+
+ @Override
+ public void offer(SendLogRequest request) {
+ super.offer(request);
recalculateDirectFollowerMap();
}
+ @Override
+ protected SendLogRequest transformRequest(Node node, SendLogRequest request) {
+ SendLogRequest newRequest = new SendLogRequest(request);
+ // copy the RPC request so each request can have different sub-receivers but the same log
+ // binary and other fields
+ newRequest.setAppendEntryRequest(new AppendEntryRequest(newRequest.getAppendEntryRequest()));
+ newRequest.getAppendEntryRequest().setSubReceivers(directToIndirectFollowerMap.get(node));
+ return newRequest;
+ }
+
public void recalculateDirectFollowerMap() {
List<Node> allNodes = new ArrayList<>(member.getAllNodes());
allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode()));
- QueryCoordinator instance = QueryCoordinator.getINSTANCE();
- List<Node> orderedNodes = instance.reorderNodes(allNodes);
- synchronized (this) {
- executorService.shutdown();
- try {
- executorService.awaitTermination(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.warn("Dispatcher thread pool of {} cannot be shutdown within 10s", member);
+ Collections.shuffle(allNodes);
+ List<Node> orderedNodes = allNodes;
+
+ nodesEnabled.clear();
+ directToIndirectFollowerMap.clear();
+
+ if (ClusterDescriptor.getInstance().getConfig().isOptimizeIndirectBroadcasting()) {
+ QueryCoordinator instance = QueryCoordinator.getINSTANCE();
+ orderedNodes = instance.reorderNodes(allNodes);
+ long thisLoad =
+ Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_SEND_RELAY.getCnt() + 1;
+ long minLoad =
+ NodeStatusManager.getINSTANCE()
+ .getNodeStatus(orderedNodes.get(0), false)
+ .getStatus()
+ .fanoutRequestNum
+ + 1;
+ double loadFactor = 1.05;
+ WeightedList<Node> firstLevelCandidates = new WeightedList<>();
+ firstLevelCandidates.insert(
+ orderedNodes.get(0),
+ 1.0
+ / (NodeStatusManager.getINSTANCE()
+ .getNodeStatus(orderedNodes.get(0), false)
+ .getStatus()
+ .fanoutRequestNum
+ + 1));
+ int firstLevelSize = 1;
+
+ for (int i = 1, orderedNodesSize = orderedNodes.size(); i < orderedNodesSize; i++) {
+ Node orderedNode = orderedNodes.get(i);
+ long nodeLoad =
+ NodeStatusManager.getINSTANCE()
+ .getNodeStatus(orderedNode, false)
+ .getStatus()
+ .fanoutRequestNum
+ + 1;
+ if (nodeLoad * 1.0 <= minLoad * loadFactor) {
+ firstLevelCandidates.insert(orderedNode, 1.0 / nodeLoad);
+ }
+ if (nodeLoad > thisLoad) {
+ firstLevelSize = (int) Math.max(firstLevelSize, nodeLoad / thisLoad);
+ }
+ }
+
+ if (firstLevelSize > firstLevelCandidates.size()) {
+ firstLevelSize = firstLevelCandidates.size();
}
- executorService = Executors.newCachedThreadPool();
- directToIndirectFollowerMap = new HashMap<>();
+ List<Node> firstLevelNodes = firstLevelCandidates.select(firstLevelSize);
+
+ Map<Node, List<Node>> secondLevelNodeMap = new HashMap<>();
+ orderedNodes.removeAll(firstLevelNodes);
+ for (int i = 0; i < orderedNodes.size(); i++) {
+ Node firstLevelNode = firstLevelNodes.get(i % firstLevelSize);
+ secondLevelNodeMap
+ .computeIfAbsent(firstLevelNode, n -> new ArrayList<>())
+ .add(orderedNodes.get(i));
+ }
+
+ for (Node firstLevelNode : firstLevelNodes) {
+ directToIndirectFollowerMap.put(
+ firstLevelNode,
+ secondLevelNodeMap.getOrDefault(firstLevelNode, Collections.emptyList()));
+ nodesEnabled.put(firstLevelNode, true);
+ }
+
+ } else {
for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) {
if (i != j) {
directToIndirectFollowerMap.put(
@@ -85,31 +165,10 @@ public class IndirectLogDispatcher extends LogDispatcher {
} else {
directToIndirectFollowerMap.put(orderedNodes.get(i), Collections.emptyList());
}
+ nodesEnabled.put(orderedNodes.get(i), true);
}
}
- for (Node node : directToIndirectFollowerMap.keySet()) {
- nodesLogQueues.put(node, createQueueAndBindingThread(node));
- }
- }
-
- class DispatcherThread extends LogDispatcher.DispatcherThread {
-
- DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
- super(receiver, logBlockingDeque);
- }
-
- @Override
- void sendLog(SendLogRequest logRequest) {
- logRequest.getAppendEntryRequest().setSubReceivers(directToIndirectFollowerMap.get(receiver));
- super.sendLog(logRequest);
- }
-
- @Override
- protected AppendEntriesRequest prepareRequest(
- List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
- return super.prepareRequest(logList, currBatch, firstIndex)
- .setSubReceivers(directToIndirectFollowerMap.get(receiver));
- }
+ logger.debug("New relay map: {}", directToIndirectFollowerMap);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index efa095032a..c2b8b70259 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -50,7 +50,7 @@ public abstract class Log implements Comparable<Log> {
private int byteSize = 0;
- public static int getDefaultBufferSize() {
+ public int getDefaultBufferSize() {
return DEFAULT_BUFFER_SIZE;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
new file mode 100644
index 0000000000..3441a01432
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+
+public class LogAckSender {
+
+ private static final Logger logger = LoggerFactory.getLogger(LogAckSender.class);
+
+ private ExecutorService ackSenderPool;
+ private RaftNode header;
+ private RaftMember member;
+ private BlockingQueue<AckRequest> requestQueue = new ArrayBlockingQueue<>(4096);
+ private String baseThreadName;
+
+ public LogAckSender(RaftMember member) {
+ this.member = member;
+ this.header = member.getHeader();
+ ackSenderPool = IoTDBThreadPoolFactory.newSingleThreadExecutor(member.getName() + "-ACKSender");
+ ackSenderPool.submit(this::appendAckLeaderTask);
+ }
+
+ public static class AckRequest {
+
+ private Node leader;
+ private long index;
+ private long term;
+ private long response;
+
+ public AckRequest(Node leader, long index, long term, long response) {
+ this.leader = leader;
+ this.index = index;
+ this.term = term;
+ this.response = response;
+ }
+ }
+
+ public void offer(Node leader, long index, long term, long response) {
+ requestQueue.add(new AckRequest(leader, index, term, response));
+ }
+
+ private void appendAckLeaderTask() {
+ List<AckRequest> ackRequestList = new ArrayList<>();
+ baseThreadName = Thread.currentThread().getName();
+ try {
+ while (!Thread.interrupted()) {
+ ackRequestList.clear();
+ synchronized (requestQueue) {
+ AckRequest req = requestQueue.take();
+ ackRequestList.add(req);
+ requestQueue.drainTo(ackRequestList);
+ }
+
+ appendAckLeader(ackRequestList);
+ Thread.sleep(10);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void appendAckLeader(List<AckRequest> requests) {
+ if (requests.isEmpty()) {
+ return;
+ }
+ int index = 0;
+ AckRequest requestToSend = null;
+ for (; index < requests.size(); index++) {
+ if (requestToSend == null) {
+ requestToSend = requests.get(index);
+ } else {
+ AckRequest currRequest = requests.get(index);
+
+ if (requestToSend.term == currRequest.term
+ && requestToSend.index >= currRequest.index
+ && (requestToSend.response == currRequest.response
+ || requestToSend.response == Response.RESPONSE_STRONG_ACCEPT)) {
+ // currRequest has the same response and leader as requestToSend, but has a smaller
+ // index, so it can be covered by requestToSend, ignore it
+ // continue
+ } else if (requestToSend.term == currRequest.term
+ && requestToSend.index < currRequest.index
+ && (requestToSend.response == currRequest.response
+ || currRequest.response == Response.RESPONSE_STRONG_ACCEPT)) {
+ // currRequest has the same response and leader as requestToSend, but has a larger
+ // index, so it can replace requestToSend
+ requestToSend = currRequest;
+ } else {
+ // the requests cannot cover each other, send requestToSend first
+ appendAckLeader(
+ requestToSend.leader,
+ requestToSend.index,
+ requestToSend.term,
+ requestToSend.response);
+ requestToSend = currRequest;
+ }
+ }
+ }
+
+ if (requestToSend != null) {
+ appendAckLeader(
+ requestToSend.leader, requestToSend.index, requestToSend.term, requestToSend.response);
+ }
+ }
+
+ private void appendAckLeader(Node leader, long index, long term, long response) {
+ long ackStartTime = Statistic.RAFT_RECEIVER_APPEND_ACK.getOperationStartTime();
+ AppendEntryResult result = new AppendEntryResult();
+ result.setLastLogIndex(index);
+ result.setLastLogTerm(term);
+ result.status = response;
+ result.setHeader(header);
+ result.setReceiver(member.getThisNode());
+
+ Client syncClient = null;
+ try {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ GenericHandler<Void> handler = new GenericHandler<>(leader, null);
+ member.getAsyncClient(leader).acknowledgeAppendEntry(result, handler);
+ } else {
+ syncClient = member.getSyncClient(leader);
+ syncClient.acknowledgeAppendEntry(result);
+ }
+ } catch (TException e) {
+ logger.warn("Cannot send ack of {}-{} to leader {}", index, term, leader, e);
+ } finally {
+ if (syncClient != null) {
+ ClientUtils.putBackSyncClient(syncClient);
+ }
+ }
+ Thread.currentThread().setName(baseThreadName + "-" + index + "-" + response);
+ Statistic.RAFT_SEND_RELAY_ACK.add(1);
+ Statistic.RAFT_RECEIVER_APPEND_ACK.calOperationCostTimeFromStart(ackStartTime);
+ }
+
+ public void stop() {
+ ackSenderPool.shutdownNow();
+ try {
+ ackSenderPool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME_S, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Unexpected interruption when waiting for ackSenderPool to end", e);
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 2410b2feaa..3e659a1468 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
@@ -58,6 +60,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -72,8 +75,9 @@ public class LogDispatcher {
private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
RaftMember member;
private static final ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
- private boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
+ protected boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueues = new HashMap<>();
+ Map<Node, Boolean> nodesEnabled = new HashMap<>();
ExecutorService executorService;
private static ExecutorService serializationService =
IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
@@ -81,17 +85,21 @@ public class LogDispatcher {
public static int bindingThreadNum = clusterConfig.getDispatcherBindingThreadNum();
public static int maxBatchSize = 10;
+ public static AtomicInteger concurrentSenderNum = new AtomicInteger();
public LogDispatcher(RaftMember member) {
this.member = member;
executorService =
- IoTDBThreadPoolFactory.newCachedThreadPool("LogDispatcher-" + member.getName());
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ bindingThreadNum * (member.getAllNodes().size() - 1),
+ "LogDispatcher-" + member.getName());
createQueueAndBindingThreads();
}
void createQueueAndBindingThreads() {
for (Node node : member.getAllNodes()) {
if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
+ nodesEnabled.put(node, true);
nodesLogQueues.put(node, createQueueAndBindingThread(node));
}
}
@@ -109,15 +117,27 @@ public class LogDispatcher {
return byteBuffer;
}
+ protected SendLogRequest transformRequest(Node node, SendLogRequest request) {
+ return request;
+ }
+
public void offer(SendLogRequest request) {
// do serialization here to avoid taking LogManager for too long
if (!nodesLogQueues.isEmpty()) {
- request.serializedLogFuture = serializationService.submit(() -> serializeTask(request));
+ SendLogRequest finalRequest = request;
+ request.serializedLogFuture = serializationService.submit(() -> serializeTask(finalRequest));
}
long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
for (Entry<Node, BlockingQueue<SendLogRequest>> entry : nodesLogQueues.entrySet()) {
+ boolean nodeEnabled = this.nodesEnabled.getOrDefault(entry.getKey(), false);
+ if (!nodeEnabled) {
+ continue;
+ }
+
+ request = transformRequest(entry.getKey(), request);
+
BlockingQueue<SendLogRequest> nodeLogQueue = entry.getValue();
try {
boolean addSucceeded;
@@ -173,6 +193,7 @@ public class LogDispatcher {
public static class SendLogRequest {
+ private AppendNodeEntryHandler handler;
private VotingLog votingLog;
private AtomicBoolean leaderShipStale;
private AtomicLong newLeaderTerm;
@@ -201,6 +222,7 @@ public class LogDispatcher {
this.setAppendEntryRequest(request.appendEntryRequest);
this.setQuorumSize(request.quorumSize);
this.setEnqueueTime(request.enqueueTime);
+ this.serializedLogFuture = request.serializedLogFuture;
}
public VotingLog getVotingLog() {
@@ -265,28 +287,27 @@ public class LogDispatcher {
private Peer peer;
Client syncClient;
AsyncClient asyncClient;
+ private String baseName;
DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
this.receiver = receiver;
this.logBlockingDeque = logBlockingDeque;
- this.peer =
- member
- .getPeerMap()
- .computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex()));
+ this.peer = member.getPeer(receiver);
if (!clusterConfig.isUseAsyncServer()) {
syncClient = member.getSyncClient(receiver);
}
+ baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
}
@Override
public void run() {
- Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver);
+ Thread.currentThread().setName(baseName);
try {
while (!Thread.interrupted()) {
synchronized (logBlockingDeque) {
SendLogRequest poll = logBlockingDeque.take();
currBatch.add(poll);
- if (maxBatchSize > 1) {
+ if (maxBatchSize > 1 && useBatchInLogCatchUp) {
logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
}
}
@@ -310,6 +331,8 @@ public class LogDispatcher {
for (SendLogRequest request : currBatch) {
Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
request.getVotingLog().getLog().getEnqueueTime());
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE.calOperationCostTimeFromStart(
+ request.getVotingLog().getLog().getCreateTime());
long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
@@ -427,6 +450,8 @@ public class LogDispatcher {
sendLogs(currBatch);
} else {
for (SendLogRequest batch : currBatch) {
+ Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
+ batch.getVotingLog().getLog().getCreateTime());
sendLog(batch);
}
}
@@ -442,21 +467,29 @@ public class LogDispatcher {
receiver,
logRequest.leaderShipStale,
logRequest.newLeaderTerm,
- peer,
logRequest.quorumSize);
// TODO add async interface
int retries = 5;
try {
long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
for (int i = 0; i < retries; i++) {
- logRequest.getVotingLog().getFailedNodeIds().remove(receiver.nodeIdentifier);
- logRequest.getVotingLog().getStronglyAcceptedNodeIds().remove(Integer.MAX_VALUE);
+ int concurrentSender = concurrentSenderNum.incrementAndGet();
+ Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
AppendEntryResult result = syncClient.appendEntry(logRequest.appendEntryRequest);
+ concurrentSenderNum.decrementAndGet();
if (result.status == Response.RESPONSE_OUT_OF_WINDOW) {
Thread.sleep(100);
+ Statistic.RAFT_SENDER_OOW.add(1);
} else {
- Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
+ long sendLogTime =
+ Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
+ NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(receiver, false);
+ nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
+ nodeStatus.getSendEntryNum().incrementAndGet();
+
+ long handleStart = Statistic.RAFT_SENDER_HANDLE_SEND_RESULT.getOperationStartTime();
handler.onComplete(result);
+ Statistic.RAFT_SENDER_HANDLE_SEND_RESULT.calOperationCostTimeFromStart(handleStart);
break;
}
}
@@ -477,7 +510,6 @@ public class LogDispatcher {
receiver,
logRequest.leaderShipStale,
logRequest.newLeaderTerm,
- peer,
logRequest.quorumSize);
AsyncClient client = member.getAsyncClient(receiver);
@@ -491,11 +523,15 @@ public class LogDispatcher {
}
void sendLog(SendLogRequest logRequest) {
+ Thread.currentThread()
+ .setName(baseName + "-" + logRequest.getVotingLog().getLog().getCurrLogIndex());
if (clusterConfig.isUseAsyncServer()) {
sendLogAsync(logRequest);
} else {
sendLogSync(logRequest);
}
+ Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
+ logRequest.getVotingLog().getLog().getCreateTime());
}
class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
@@ -511,7 +547,6 @@ public class LogDispatcher {
receiver,
sendLogRequest.getLeaderShipStale(),
sendLogRequest.getNewLeaderTerm(),
- peer,
sendLogRequest.getQuorumSize());
singleEntryHandlers.add(handler);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 69645586d7..5972fba588 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -25,18 +25,22 @@ import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/** LogRelay is used by followers to forward entries from the leader to other followers. */
public class LogRelay {
+ private static final Logger logger = LoggerFactory.getLogger(LogRelay.class);
+
private ConcurrentSkipListSet<RelayEntry> entryHeap = new ConcurrentSkipListSet<>();
private static final int RELAY_NUMBER =
ClusterDescriptor.getInstance().getConfig().getRelaySenderNum();
@@ -46,11 +50,8 @@ public class LogRelay {
public LogRelay(RaftMember raftMember) {
this.raftMember = raftMember;
relaySenders =
- Executors.newFixedThreadPool(
- RELAY_NUMBER,
- new ThreadFactoryBuilder()
- .setNameFormat(raftMember.getName() + "-RelaySender-%d")
- .build());
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ RELAY_NUMBER, raftMember.getName() + "-RelaySender");
for (int i = 0; i < RELAY_NUMBER; i++) {
relaySenders.submit(new RelayThread());
}
@@ -65,6 +66,7 @@ public class LogRelay {
}
private void offer(RelayEntry entry) {
+ long operationStartTime = Statistic.RAFT_SENDER_RELAY_OFFER_LOG.getOperationStartTime();
synchronized (entryHeap) {
while (entryHeap.size()
> ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
@@ -77,6 +79,7 @@ public class LogRelay {
entryHeap.add(entry);
entryHeap.notifyAll();
}
+ Statistic.RAFT_SENDER_RELAY_OFFER_LOG.calOperationCostTimeFromStart(operationStartTime);
}
public void offer(AppendEntriesRequest request, List<Node> receivers) {
@@ -87,6 +90,7 @@ public class LogRelay {
@Override
public void run() {
+ String baseName = Thread.currentThread().getName();
while (!Thread.interrupted()) {
RelayEntry relayEntry;
synchronized (entryHeap) {
@@ -102,9 +106,25 @@ public class LogRelay {
}
}
+ logger.debug("Relaying {}", relayEntry);
+
if (relayEntry.singleRequest != null) {
+ Thread.currentThread()
+ .setName(
+ baseName
+ + "-"
+ + (relayEntry.singleRequest.prevLogIndex + 1)
+ + "-"
+ + relayEntry.receivers);
raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
} else if (relayEntry.batchRequest != null) {
+ Thread.currentThread()
+ .setName(
+ baseName
+ + "-"
+ + (relayEntry.batchRequest.prevLogIndex + 1)
+ + "-"
+ + relayEntry.receivers);
raftMember.sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
}
@@ -138,6 +158,13 @@ public class LogRelay {
return 0;
}
+ @Override
+ public String toString() {
+ long index = singleRequest != null ? singleRequest.prevLogIndex : batchRequest.prevLogIndex;
+ index++;
+ return "RelayEntry{" + index + "," + receivers + "}";
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -153,7 +180,7 @@ public class LogRelay {
@Override
public int hashCode() {
- return Objects.hash(singleRequest);
+ return Objects.hash(singleRequest, batchRequest);
}
@Override
@@ -161,4 +188,12 @@ public class LogRelay {
return Long.compare(this.getIndex(), o.getIndex());
}
}
+
+ public RelayEntry first() {
+ try {
+ return entryHeap.isEmpty() ? null : entryHeap.first();
+ } catch (NoSuchElementException e) {
+ return null;
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 59bc40bf25..7b1dde2d3e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -20,14 +20,20 @@
package org.apache.iotdb.cluster.log;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.List;
public class VotingLogList {
+ private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
+
private List<VotingLog> logList = new ArrayList<>();
private volatile long currTerm = -1;
private int quorumSize;
@@ -63,6 +69,7 @@ public class VotingLogList {
* @return the lastly removed entry if any.
*/
public void onStronglyAccept(long index, long term, int acceptingNodeId) {
+ logger.debug("{}-{} is strongly accepted by {}", index, term, acceptingNodeId);
int lastEntryIndexToCommit = -1;
List<VotingLog> acceptedLogs;
@@ -93,6 +100,15 @@ public class VotingLogList {
}
if (lastEntryIndexToCommit != -1) {
+ Log lastLog = acceptedLogs.get(acceptedLogs.size() - 1).log;
+ synchronized (member.getLogManager()) {
+ try {
+ member.getLogManager().commitTo(lastLog.getCurrLogIndex());
+ } catch (LogExecutionException e) {
+ logger.error("Fail to commit {}", lastLog, e);
+ }
+ }
+
for (VotingLog acceptedLog : acceptedLogs) {
synchronized (acceptedLog) {
acceptedLog.acceptedTime.set(System.nanoTime());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
index d2d9056a16..a08b542f93 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.cluster.log.appender;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -30,6 +32,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.Buffer;
import java.util.List;
/**
@@ -57,24 +60,28 @@ public class BlockingLogAppender implements LogAppender {
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
* .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
- public AppendEntryResult appendEntry(
- long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
- long resp = checkPrevLogIndex(prevLogIndex);
+ public AppendEntryResult appendEntry(AppendEntryRequest request, Log log) {
+ long resp = checkPrevLogIndex(request.prevLogIndex);
if (resp != Response.RESPONSE_AGREE) {
return new AppendEntryResult(resp).setHeader(member.getHeader());
}
- long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
long success;
AppendEntryResult result = new AppendEntryResult();
synchronized (logManager) {
- success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+ success =
+ logManager.maybeAppend(
+ request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
if (success != -1) {
result.setLastLogIndex(logManager.getLastLogIndex());
result.setLastLogTerm(logManager.getLastLogTerm());
+ if (request.isSetSubReceivers() && !request.getSubReceivers().isEmpty()) {
+ request.entry.rewind();
+ member.getLogRelay().offer(request, request.subReceivers);
+ }
}
}
- Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+
if (success != -1) {
logger.debug("{} append a new log {}", member.getName(), log);
result.status = Response.RESPONSE_STRONG_ACCEPT;
@@ -82,6 +89,7 @@ public class BlockingLogAppender implements LogAppender {
// the incoming log points to an illegal position, reject it
result.status = Response.RESPONSE_LOG_MISMATCH;
}
+ result.setHeader(request.getHeader());
return result;
}
@@ -134,36 +142,43 @@ public class BlockingLogAppender implements LogAppender {
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
* .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
- public AppendEntryResult appendEntries(
- long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
+ public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Log> logs) {
logger.debug(
"{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
member.getName(),
- prevLogIndex,
- prevLogTerm,
- leaderCommit);
+ request.prevLogIndex,
+ request.prevLogTerm,
+ request.leaderCommit);
if (logs.isEmpty()) {
return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(member.getHeader());
}
- long resp = checkPrevLogIndex(prevLogIndex);
+ long resp = checkPrevLogIndex(request.prevLogIndex);
if (resp != Response.RESPONSE_AGREE) {
return new AppendEntryResult(resp).setHeader(member.getHeader());
}
AppendEntryResult result = new AppendEntryResult();
synchronized (logManager) {
- long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
- resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, logs);
- Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+ resp =
+ logManager.maybeAppend(
+ request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
if (resp != -1) {
if (logger.isDebugEnabled()) {
logger.debug(
- "{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
+ "{} append a new log list {}, commit to {}",
+ member.getName(),
+ logs,
+ request.leaderCommit);
}
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(logManager.getLastLogIndex());
result.setLastLogTerm(logManager.getLastLogTerm());
+
+ if (request.isSetSubReceivers()) {
+ request.entries.forEach(Buffer::rewind);
+ member.getLogRelay().offer(request, request.subReceivers);
+ }
} else {
// the incoming log points to an illegal position, reject it
result.status = Response.RESPONSE_LOG_MISMATCH;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
index 01f16daf09..ca0eea3bf0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.cluster.log.appender;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import java.util.List;
@@ -30,10 +32,9 @@ import java.util.List;
*/
public interface LogAppender {
- AppendEntryResult appendEntries(
- long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs);
+ AppendEntryResult appendEntries(AppendEntriesRequest request, List<Log> logs);
- AppendEntryResult appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log);
+ AppendEntryResult appendEntry(AppendEntryRequest request, Log log);
void reset();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
index f2a19e8039..1ab08449bb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
@@ -22,17 +22,24 @@ package org.apache.iotdb.cluster.log.appender;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.Buffer;
import java.util.Arrays;
import java.util.List;
public class SlidingWindowLogAppender implements LogAppender {
+ private static final Logger logger = LoggerFactory.getLogger(SlidingWindowLogAppender.class);
+
private int windowCapacity = ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
private int windowLength = 0;
private Log[] logWindow = new Log[windowCapacity];
@@ -115,25 +122,43 @@ public class SlidingWindowLogAppender implements LogAppender {
// flush [0, flushPos)
List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
+ logger.debug(
+ "Flushing {} entries to log, first {}, last {}",
+ logs.size(),
+ logs.get(0),
+ logs.get(logs.size() - 1));
long success =
logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
if (success != -1) {
- System.arraycopy(logWindow, flushPos, logWindow, 0, windowCapacity - flushPos);
- System.arraycopy(prevTerms, flushPos, prevTerms, 0, windowCapacity - flushPos);
- for (int i = 1; i <= flushPos; i++) {
- logWindow[windowCapacity - i] = null;
- }
+ moveWindowRightward(flushPos);
}
- firstPosPrevIndex = logManager.getLastLogIndex();
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(firstPosPrevIndex);
result.setLastLogTerm(logManager.getLastLogTerm());
return success;
}
+ private void moveWindowRightward(int step) {
+ System.arraycopy(logWindow, step, logWindow, 0, windowCapacity - step);
+ System.arraycopy(prevTerms, step, prevTerms, 0, windowCapacity - step);
+ for (int i = 1; i <= step; i++) {
+ logWindow[windowCapacity - i] = null;
+ }
+ firstPosPrevIndex = logManager.getLastLogIndex();
+ }
+
+ private void moveWindowLeftward(int step) {
+ int length = Math.max(windowCapacity - step, 0);
+ System.arraycopy(logWindow, 0, logWindow, step, length);
+ System.arraycopy(prevTerms, 0, prevTerms, step, length);
+ for (int i = 0; i < length; i++) {
+ logWindow[i] = null;
+ }
+ firstPosPrevIndex = logManager.getLastLogIndex();
+ }
+
@Override
- public AppendEntryResult appendEntries(
- long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
+ public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Log> logs) {
if (logs.isEmpty()) {
return new AppendEntryResult(Response.RESPONSE_AGREE)
.setHeader(member.getPartitionGroup().getHeader());
@@ -141,24 +166,56 @@ public class SlidingWindowLogAppender implements LogAppender {
AppendEntryResult result = null;
for (Log log : logs) {
- result = appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
+ result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
if (result.status != Response.RESPONSE_AGREE
&& result.status != Response.RESPONSE_STRONG_ACCEPT
&& result.status != Response.RESPONSE_WEAK_ACCEPT) {
return result;
}
- prevLogIndex = log.getCurrLogIndex();
- prevLogTerm = log.getCurrLogTerm();
+ request.prevLogIndex = log.getCurrLogIndex();
+ request.prevLogTerm = log.getCurrLogTerm();
+ }
+ if (request.isSetSubReceivers()) {
+ request.entries.forEach(Buffer::rewind);
+ member.getLogRelay().offer(request, request.subReceivers);
}
return result;
}
@Override
- public AppendEntryResult appendEntry(
+ public AppendEntryResult appendEntry(AppendEntryRequest request, Log log) {
+
+ AppendEntryResult result = null;
+ long start = System.currentTimeMillis();
+ long retryTime = 0;
+ long maxRetry = 10000;
+ while (result == null
+ || result.status == Response.RESPONSE_OUT_OF_WINDOW && retryTime < maxRetry) {
+ result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
+ retryTime = System.currentTimeMillis() - start;
+ if (result.status == Response.RESPONSE_OUT_OF_WINDOW && retryTime < maxRetry) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ result.setHeader(request.getHeader());
+
+ if (request.isSetSubReceivers() && !request.getSubReceivers().isEmpty()) {
+ request.entry.rewind();
+ member.getLogRelay().offer(request, request.subReceivers);
+ }
+
+ return result;
+ }
+
+ private AppendEntryResult appendEntry(
long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
- long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
long appendedPos = 0;
AppendEntryResult result = new AppendEntryResult();
@@ -170,6 +227,7 @@ public class SlidingWindowLogAppender implements LogAppender {
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(logManager.getLastLogIndex());
result.setLastLogTerm(logManager.getLastLogTerm());
+ moveWindowLeftward(-windowPos);
} else if (windowPos < windowCapacity) {
// the new entry falls into the window
logWindow[windowPos] = log;
@@ -186,14 +244,12 @@ public class SlidingWindowLogAppender implements LogAppender {
Statistic.RAFT_WINDOW_LENGTH.add(windowLength);
} else {
- Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
result.setHeader(member.getPartitionGroup().getHeader());
return result;
}
}
- Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
if (appendedPos == -1) {
// the incoming log points to an illegal position, reject it
result.status = Response.RESPONSE_LOG_MISMATCH;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index 1f3882b398..5d5793d350 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.log.logtypes;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.slf4j.Logger;
@@ -46,6 +47,16 @@ public class PhysicalPlanLog extends Log {
this.plan = plan;
}
+ @Override
+ public int getDefaultBufferSize() {
+ if (plan instanceof DummyPlan) {
+ int workloadSize =
+ ((DummyPlan) plan).getWorkload() == null ? 0 : ((DummyPlan) plan).getWorkload().length;
+ return workloadSize + 512;
+ }
+ return DEFAULT_BUFFER_SIZE;
+ }
+
@Override
public ByteBuffer serialize() {
PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultBufferSize());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 4b2309281d..406245582c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -682,8 +682,15 @@ public abstract class RaftLogManager {
"There are too many unapplied logs [{}], wait for a while to avoid memory overflow",
unappliedLogSize);
try {
- Thread.sleep(
- unappliedLogSize - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+ synchronized (changeApplyCommitIndexCond) {
+ changeApplyCommitIndexCond.wait(
+ Math.min(
+ (unappliedLogSize
+ - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem())
+ / 10
+ + 1,
+ 1000));
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -954,7 +961,7 @@ public abstract class RaftLogManager {
}
public void checkAppliedLogIndex() {
- while (!Thread.currentThread().isInterrupted()) {
+ while (!Thread.interrupted()) {
try {
doCheckAppliedLogIndex();
} catch (IndexOutOfBoundsException e) {
@@ -976,7 +983,7 @@ public abstract class RaftLogManager {
|| nextToCheckIndex > getCommittedEntryManager().getLastIndex()
|| (blockAppliedCommitIndex > 0 && blockAppliedCommitIndex < nextToCheckIndex)) {
// avoid spinning
- Thread.sleep(0);
+ Thread.sleep(100);
return;
}
Log log = getCommittedEntryManager().getEntry(nextToCheckIndex);
@@ -992,7 +999,7 @@ public abstract class RaftLogManager {
synchronized (log) {
while (!log.isApplied() && maxHaveAppliedCommitIndex < log.getCurrLogIndex()) {
// wait until the log is applied or a newer snapshot is installed
- log.wait(1);
+ log.wait(10);
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
index 21ec9f92ad..8353dda742 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
@@ -37,7 +37,8 @@ public class QueryCoordinator {
private static final QueryCoordinator INSTANCE = new QueryCoordinator();
private static final NodeStatusManager STATUS_MANAGER = NodeStatusManager.getINSTANCE();
- private final Comparator<Node> nodeComparator = Comparator.comparing(this::getNodeStatus);
+ private final Comparator<Node> nodeComparator =
+ Comparator.comparingLong(o -> getNodeStatus(o).getStatus().fanoutRequestNum);
private QueryCoordinator() {
// singleton class
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 7e703a6dc1..7f8c7df4f4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -56,8 +55,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
protected AtomicLong receiverTerm;
protected VotingLog log;
protected AtomicBoolean leaderShipStale;
- protected Node receiver;
- protected Peer peer;
+ protected Node directReceiver;
protected int quorumSize;
// nano start time when the send begins
@@ -79,8 +77,11 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
// the request already failed
return;
}
+
+ Node trueReceiver = response.isSetReceiver() ? response.receiver : directReceiver;
+
logger.debug(
- "{}: Append response {} from {} for log {}", member.getName(), response, receiver, log);
+ "{}: Append response {} from {} for log {}", member.getName(), response, trueReceiver, log);
if (leaderShipStale.get()) {
// someone has rejected this log because the leadership is stale
return;
@@ -94,8 +95,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
.onStronglyAccept(
log.getLog().getCurrLogIndex(),
log.getLog().getCurrLogTerm(),
- receiver.nodeIdentifier);
- peer.setMatchIndex(Math.max(log.getLog().getCurrLogIndex(), peer.getMatchIndex()));
+ trueReceiver.nodeIdentifier);
+ member.getPeer(trueReceiver).setMatchIndex(response.lastLogIndex);
} else if (resp > 0) {
// a response > 0 is the follower's term
// the leader ship is stale, wait for the new leader's heartbeat
@@ -103,7 +104,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
logger.debug(
"{}: Received a rejection from {} because term is stale: {}/{}, log: {}",
member.getName(),
- receiver,
+ trueReceiver,
prevReceiverTerm,
resp,
log);
@@ -120,14 +121,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
}
} else if (resp == RESPONSE_WEAK_ACCEPT) {
synchronized (log) {
- log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
+ log.getWeaklyAcceptedNodeIds().add(trueReceiver.nodeIdentifier);
if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
>= quorumSize) {
log.acceptedTime.set(System.nanoTime());
- if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
- member.removeAppendLogHandler(
- new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()));
- }
}
log.notifyAll();
}
@@ -135,13 +132,20 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
// e.g., Response.RESPONSE_LOG_MISMATCH
if (resp == RESPONSE_LOG_MISMATCH || resp == RESPONSE_OUT_OF_WINDOW) {
logger.debug(
- "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+ "{}: The log {} is rejected by {} because: {}",
+ member.getName(),
+ log,
+ trueReceiver,
+ resp);
} else {
logger.warn(
- "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+ "{}: The log {} is rejected by {} because: {}",
+ member.getName(),
+ log,
+ trueReceiver,
+ resp);
+ onFail(trueReceiver);
}
-
- onFail();
}
// rejected because the receiver's logs are stale or the receiver has no cluster info, just
// wait for the heartbeat to handle
@@ -154,17 +158,18 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
"{}: Cannot append log {}: cannot connect to {}: {}",
member.getName(),
log,
- receiver,
+ directReceiver,
exception.getMessage());
} else {
- logger.warn("{}: Cannot append log {} to {}", member.getName(), log, receiver, exception);
+ logger.warn(
+ "{}: Cannot append log {} to {}", member.getName(), log, directReceiver, exception);
}
- onFail();
+ onFail(directReceiver);
}
- private void onFail() {
+ private void onFail(Node trueReceiver) {
synchronized (log) {
- log.getFailedNodeIds().add(receiver.nodeIdentifier);
+ log.getFailedNodeIds().add(trueReceiver.nodeIdentifier);
if (log.getFailedNodeIds().size() > quorumSize) {
// quorum members have failed, there is no need to wait for others
log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
@@ -189,12 +194,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
this.leaderShipStale = leaderShipStale;
}
- public void setPeer(Peer peer) {
- this.peer = peer;
- }
-
- public void setReceiver(Node follower) {
- this.receiver = follower;
+ public void setDirectReceiver(Node follower) {
+ this.directReceiver = follower;
}
public void setReceiverTerm(AtomicLong receiverTerm) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index af4fbc5445..f89cd56241 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -53,8 +53,8 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
@Override
public void onComplete(HeartBeatResponse resp) {
long followerTerm = resp.getTerm();
- if (logger.isDebugEnabled()) {
- logger.debug(
+ if (logger.isTraceEnabled()) {
+ logger.trace(
"{}: Received a heartbeat response {} for last log index {}",
memberName,
followerTerm,
@@ -89,8 +89,8 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
long lastLogTerm = resp.getLastLogTerm();
long localLastLogIdx = localMember.getLogManager().getLastLogIndex();
long localLastLogTerm = localMember.getLogManager().getLastLogTerm();
- if (logger.isDebugEnabled()) {
- logger.debug(
+ if (logger.isTraceEnabled()) {
+ logger.trace(
"{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
memberName,
follower,
@@ -100,11 +100,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
localLastLogTerm);
}
- Peer peer =
- localMember
- .getPeerMap()
- .computeIfAbsent(
- follower, k -> new Peer(localMember.getLogManager().getLastLogIndex()));
+ Peer peer = localMember.getPeer(follower);
if (!localMember.getLogManager().isLogUpToDate(lastLogTerm, lastLogIdx)
|| !localMember.getLogManager().matchTerm(lastLogTerm, lastLogIdx)) {
// the follower is not up-to-date
@@ -119,7 +115,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
if (lastLogIdx == peer.getLastHeartBeatIndex()) {
// the follower's lastLogIndex is unchanged, increase inconsistent counter
int inconsistentNum = peer.incInconsistentHeartbeatNum();
- if (inconsistentNum >= 10) {
+ if (inconsistentNum >= 1000) {
logger.info(
"{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index c5032966ea..acc34cf6c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -101,7 +101,7 @@ public class HeartbeatThread implements Runnable {
localMember.setCharacter(NodeCharacter.ELECTOR);
localMember.setLeader(ClusterConstant.EMPTY_NODE);
} else {
- logger.debug(
+ logger.trace(
"{}: Heartbeat from leader {} is still valid",
memberName,
localMember.getLeader());
@@ -158,7 +158,7 @@ public class HeartbeatThread implements Runnable {
@SuppressWarnings("java:S2445")
private void sendHeartbeats(Collection<Node> nodes) {
if (logger.isDebugEnabled()) {
- logger.debug(
+ logger.trace(
"{}: Send heartbeat to {} followers, commit log index = {}",
memberName,
nodes.size() - 1,
@@ -200,7 +200,7 @@ public class HeartbeatThread implements Runnable {
if (client != null) {
// connecting to the local node results in a null
try {
- logger.debug("{}: Sending heartbeat to {}", memberName, node);
+ logger.trace("{}: Sending heartbeat to {}", memberName, node);
client.sendHeartbeat(request, new HeartbeatHandler(localMember, node));
} catch (Exception e) {
logger.warn("{}: Cannot send heart beat to node {}", memberName, node, e);
@@ -231,7 +231,7 @@ public class HeartbeatThread implements Runnable {
Client client = localMember.getSyncHeartbeatClient(node);
if (client != null) {
try {
- logger.debug("{}: Sending heartbeat to {}", memberName, node);
+ logger.trace("{}: Sending heartbeat to {}", memberName, node);
HeartBeatResponse heartBeatResponse = client.sendHeartbeat(req);
heartbeatHandler.onComplete(heartBeatResponse);
} catch (TTransportException e) {
@@ -266,6 +266,16 @@ public class HeartbeatThread implements Runnable {
logger.info("{}: Winning the election because the node is the only node.", memberName);
}
+ if (!ClusterUtils.isNodeEquals(
+ localMember.getThisNode(), localMember.getPartitionGroup().getHeader().node)) {
+ long electionWait = getElectionRandomWaitMs();
+ logger.info(
+ "{}: Sleep {}ms before the first election as this node is not the preferred " + "leader",
+ memberName,
+ electionWait);
+ Thread.sleep(electionWait);
+ }
+
// the election goes on until this node becomes a follower or a leader
while (localMember.getCharacter() == NodeCharacter.ELECTOR) {
startElection();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index c43401af57..3114182960 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -1006,7 +1006,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
NodeStatusManager.getINSTANCE().getLastResponseLatency(getHeader().getNode()),
lastHeartbeatReceivedTime,
prevLastLogIndex,
- logManager.getMaxHaveAppliedCommitIndex());
+ logManager.getMaxHaveAppliedCommitIndex(),
+ logRelay != null ? logRelay.first() : null);
}
@TestOnly
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index e07452cbd1..637ce44542 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1800,7 +1800,8 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
readOnly,
lastHeartbeatReceivedTime,
prevLastLogIndex,
- logManager.getMaxHaveAppliedCommitIndex());
+ logManager.getMaxHaveAppliedCommitIndex(),
+ logRelay != null ? logRelay.first() : null);
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 4df17e12e9..db75f3b1c2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.cluster.log.FragmentedLogDispatcher;
import org.apache.iotdb.cluster.log.HardState;
import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogAckSender;
import org.apache.iotdb.cluster.log.LogDispatcher;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.log.LogParser;
@@ -72,6 +73,7 @@ import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -110,7 +112,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketTimeoutException;
-import java.nio.Buffer;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -132,6 +133,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.log.LogDispatcher.concurrentSenderNum;
/**
* RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
@@ -287,6 +289,10 @@ public abstract class RaftMember implements RaftMemberMBean {
protected LogRelay logRelay;
+ protected LogAckSender ackSender;
+
+ private ThreadLocal<String> threadBaseName = new ThreadLocal<>();
+
protected RaftMember() {}
protected RaftMember(String name, ClientManager clientManager) {
@@ -325,6 +331,8 @@ public abstract class RaftMember implements RaftMemberMBean {
getName() + "-SerialToParallel");
commitLogPool = IoTDBThreadPoolFactory.newSingleThreadExecutor("RaftCommitLog");
+ ackSender = new LogAckSender(this);
+
if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
logRelay = new LogRelay(this);
}
@@ -416,6 +424,10 @@ public abstract class RaftMember implements RaftMemberMBean {
if (logRelay != null) {
logRelay.stop();
}
+ if (ackSender != null) {
+ ackSender.stop();
+ }
+
leader.set(ClusterConstant.EMPTY_NODE);
catchUpService = null;
heartBeatService = null;
@@ -572,16 +584,22 @@ public abstract class RaftMember implements RaftMemberMBean {
return Response.RESPONSE_AGREE;
}
+ private String getThreadBaseName() {
+ if (threadBaseName.get() == null) {
+ threadBaseName.set(Thread.currentThread().getName());
+ }
+ return threadBaseName.get();
+ }
/**
* Process an AppendEntryRequest. First check the term of the leader, then parse the log and
* finally see if we can find a position to append the log.
*/
public AppendEntryResult appendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
+ long operationStartTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
+ Thread.currentThread()
+ .setName(getThreadBaseName() + "-appending-" + (request.prevLogIndex + 1));
AppendEntryResult result = appendEntryInternal(request);
- if (request.isSetSubReceivers()) {
- request.entry.rewind();
- logRelay.offer(request, request.subReceivers);
- }
+ Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(operationStartTime);
return result;
}
@@ -600,54 +618,19 @@ public abstract class RaftMember implements RaftMemberMBean {
log.setByteSize(logByteSize);
Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
- AppendEntryResult result =
- getLogAppender()
- .appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
- result.setHeader(request.getHeader());
+ long appendStartTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+ AppendEntryResult result = getLogAppender().appendEntry(request, log);
+ Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(appendStartTime);
logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result.status);
if (!request.isFromLeader) {
- appendAckLeader(request.leader, log, result.status);
- Statistic.RAFT_SEND_RELAY_ACK.add(1);
+ ackSender.offer(request.leader, log.getCurrLogIndex(), log.getCurrLogTerm(), result.status);
}
return result;
}
- private void appendAckLeader(Node leader, Log log, long response) {
- AppendEntryResult result = new AppendEntryResult();
- result.setLastLogIndex(log.getCurrLogIndex());
- result.setLastLogTerm(log.getCurrLogTerm());
- result.status = response;
- result.setHeader(getHeader());
-
- Client syncClient = null;
- try {
- if (config.isUseAsyncServer()) {
- GenericHandler<Void> handler = new GenericHandler<>(leader, null);
- getAsyncClient(leader).acknowledgeAppendEntry(result, handler);
- } else {
- syncClient = getSyncClient(leader);
- syncClient.acknowledgeAppendEntry(result);
- }
- } catch (TException e) {
- logger.warn("Cannot send ack of {} to leader {}", log, leader, e);
- } finally {
- if (syncClient != null) {
- ClientUtils.putBackSyncClient(syncClient);
- }
- }
- }
-
- public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subFollowers)
- throws UnknownLogTypeException {
- AppendEntryResult result = appendEntry(request);
- request.entry.rewind();
- logRelay.offer(request, subFollowers);
- return result;
- }
-
public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
request.setIsFromLeader(false);
request.setSubReceiversIsSet(false);
@@ -658,8 +641,19 @@ public abstract class RaftMember implements RaftMemberMBean {
getAsyncClient(subFollower)
.appendEntry(request, new IndirectAppendHandler(subFollower, request));
} else {
+ long operationStartTime = Statistic.RAFT_SENDER_RELAY_LOG.getOperationStartTime();
syncClient = getSyncClient(subFollower);
+
+ int concurrentSender = concurrentSenderNum.incrementAndGet();
+ Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
syncClient.appendEntry(request);
+ concurrentSenderNum.decrementAndGet();
+
+ long sendLogTime =
+ Statistic.RAFT_SENDER_RELAY_LOG.calOperationCostTimeFromStart(operationStartTime);
+ NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(subFollower, false);
+ nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
+ nodeStatus.getSendEntryNum().incrementAndGet();
}
} catch (TException e) {
logger.error("Cannot send {} to {}", request, subFollower, e);
@@ -697,12 +691,7 @@ public abstract class RaftMember implements RaftMemberMBean {
/** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
public AppendEntryResult appendEntries(AppendEntriesRequest request)
throws UnknownLogTypeException {
- AppendEntryResult result = appendEntriesInternal(request);
- if (request.isSetSubReceivers()) {
- request.entries.forEach(Buffer::rewind);
- logRelay.offer(request, request.subReceivers);
- }
- return result;
+ return appendEntriesInternal(request);
}
/** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
@@ -736,9 +725,10 @@ public abstract class RaftMember implements RaftMemberMBean {
Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
- response =
- getLogAppender()
- .appendEntries(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+ long appendStartTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+ response = getLogAppender().appendEntries(request, logs);
+ Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(appendStartTime);
+
if (logger.isDebugEnabled()) {
logger.debug(
"{} AppendEntriesRequest of log size {} completed with result {}",
@@ -750,7 +740,8 @@ public abstract class RaftMember implements RaftMemberMBean {
if (!request.isFromLeader) {
// TODO: use batch ack
for (Log log : logs) {
- appendAckLeader(request.leader, log, response.status);
+ ackSender.offer(
+ request.leader, log.getCurrLogIndex(), log.getCurrLogTerm(), response.status);
}
Statistic.RAFT_SEND_RELAY_ACK.add(1);
}
@@ -771,12 +762,11 @@ public abstract class RaftMember implements RaftMemberMBean {
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
- Peer peer,
int quorumSize) {
AsyncClient client = getSendLogAsyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, peer, quorumSize);
+ getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, quorumSize);
try {
client.appendEntry(request, handler);
logger.debug("{} sending a log to {}: {}", name, node, log);
@@ -1414,8 +1404,8 @@ public abstract class RaftMember implements RaftMemberMBean {
}
}
- public Map<Node, Peer> getPeerMap() {
- return peerMap;
+ public Peer getPeer(Node node) {
+ return peerMap.computeIfAbsent(node, r -> new Peer(getLogManager().getLastLogIndex()));
}
/** @return true if there is a log whose index is "index" and term is "term", false otherwise */
@@ -1752,9 +1742,10 @@ public abstract class RaftMember implements RaftMemberMBean {
int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
long nextTimeToPrint = 5000;
- long waitStart = System.currentTimeMillis();
+ long waitStart = System.nanoTime();
long alreadyWait = 0;
+ String threadBaseName = Thread.currentThread().getName();
synchronized (log) {
while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
|| stronglyAcceptedNodeNum < quorumSize
@@ -1769,7 +1760,16 @@ public abstract class RaftMember implements RaftMemberMBean {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption when sending a log", e);
}
- alreadyWait = System.currentTimeMillis() - waitStart;
+ Thread.currentThread()
+ .setName(
+ threadBaseName
+ + "-waiting-"
+ + log.getLog().getCurrLogIndex()
+ + "-"
+ + log.getStronglyAcceptedNodeIds()
+ + "-"
+ + log.getWeaklyAcceptedNodeIds());
+ alreadyWait = (System.nanoTime() - waitStart) / 1000000;
if (alreadyWait > nextTimeToPrint) {
logger.info(
"Still not receive enough votes for {}, strongly accepted {}, weakly "
@@ -1780,6 +1780,7 @@ public abstract class RaftMember implements RaftMemberMBean {
log.getStronglyAcceptedNodeIds(),
log.getWeaklyAcceptedNodeIds(),
votingLogList.size(),
+ alreadyWait,
(log.getLog().getSequenceStartTime() - waitStart) / 1000000,
(log.getLog().getEnqueueTime() - waitStart) / 1000000,
(log.acceptedTime.get() - waitStart) / 1000000);
@@ -1790,6 +1791,7 @@ public abstract class RaftMember implements RaftMemberMBean {
totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
}
}
+ Thread.currentThread().setName(threadBaseName);
if (alreadyWait > 15000) {
logger.info(
@@ -1820,6 +1822,7 @@ public abstract class RaftMember implements RaftMemberMBean {
|| (totalAccepted < quorumSize)
|| votingLogList.size() > config.getMaxNumOfLogsInMem())
&& !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE))) {
+
waitAppendResultLoop(log, quorumSize);
}
stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
@@ -2177,9 +2180,9 @@ public abstract class RaftMember implements RaftMemberMBean {
}
if (config.isUseAsyncServer()) {
- sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
+ sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, quorumSize);
} else {
- sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
+ sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, quorumSize);
}
}
@@ -2217,12 +2220,11 @@ public abstract class RaftMember implements RaftMemberMBean {
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
- Peer peer,
int quorumSize) {
Client client = getSyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, peer, quorumSize);
+ getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, quorumSize);
try {
logger.debug("{} sending a log to {}: {}", name, node, log);
long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
@@ -2248,14 +2250,12 @@ public abstract class RaftMember implements RaftMemberMBean {
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
- Peer peer,
int quorumSize) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
- handler.setReceiver(node);
+ handler.setDirectReceiver(node);
handler.setLeaderShipStale(leaderShipStale);
handler.setLog(log);
handler.setMember(this);
- handler.setPeer(peer);
handler.setReceiverTerm(newLeaderTerm);
handler.setQuorumSize(quorumSize);
if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
@@ -2283,26 +2283,22 @@ public abstract class RaftMember implements RaftMemberMBean {
private long checkRequestTerm(long leaderTerm, Node leader) {
long localTerm;
- synchronized (logManager) {
- // if the request comes before the heartbeat arrives, the local term may be smaller than the
- // leader term
- localTerm = term.get();
- if (leaderTerm < localTerm) {
- logger.debug(
- "{} rejected the AppendEntriesRequest for term: {}/{}", name, leaderTerm, localTerm);
- return localTerm;
+ // if the request comes before the heartbeat arrives, the local term may be smaller than the
+ // leader term
+ localTerm = term.get();
+ if (leaderTerm < localTerm) {
+ logger.debug(
+ "{} rejected the AppendEntriesRequest for term: {}/{}", name, leaderTerm, localTerm);
+ return localTerm;
+ } else {
+ if (leaderTerm > localTerm) {
+ stepDown(leaderTerm, true);
} else {
- if (leaderTerm > localTerm) {
- stepDown(leaderTerm, true);
- } else {
- lastHeartbeatReceivedTime = System.currentTimeMillis();
- }
- setLeader(leader);
- if (character != NodeCharacter.FOLLOWER) {
- term.notifyAll();
- }
+ lastHeartbeatReceivedTime = System.currentTimeMillis();
}
+ setLeader(leader);
}
+
logger.debug("{} accepted the AppendEntryRequest for term: {}", name, localTerm);
return Response.RESPONSE_AGREE;
}
@@ -2324,17 +2320,19 @@ public abstract class RaftMember implements RaftMemberMBean {
* @param ack acknowledgement from an indirect receiver.
*/
public void acknowledgeAppendLog(AppendEntryResult ack) {
+ long operationStartTime = Statistic.RAFT_RECEIVER_HANDLE_APPEND_ACK.getOperationStartTime();
AppendNodeEntryHandler appendNodeEntryHandler =
sentLogHandlers.get(new Pair<>(ack.lastLogIndex, ack.lastLogTerm));
+ Statistic.RAFT_RECEIVE_RELAY_ACK.add(1);
if (appendNodeEntryHandler != null) {
appendNodeEntryHandler.onComplete(ack);
- Statistic.RAFT_RECEIVE_RELAY_ACK.add(1);
}
+ Statistic.RAFT_RECEIVER_HANDLE_APPEND_ACK.calOperationCostTimeFromStart(operationStartTime);
}
public void registerAppendLogHandler(
Pair<Long, Long> indexTerm, AppendNodeEntryHandler appendNodeEntryHandler) {
- sentLogHandlers.put(indexTerm, appendNodeEntryHandler);
+ sentLogHandlers.putIfAbsent(indexTerm, appendNodeEntryHandler);
}
public void removeAppendLogHandler(Pair<Long, Long> indexTerm) {
@@ -2404,4 +2402,8 @@ public abstract class RaftMember implements RaftMemberMBean {
public void setClientManager(ClientManager clientManager) {
this.clientManager = clientManager;
}
+
+ public LogRelay getLogRelay() {
+ return logRelay;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index 10538be31e..cc284f0bb4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.server.monitor;
+import org.apache.iotdb.cluster.log.LogRelay.RelayEntry;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.NodeCharacter;
@@ -78,6 +79,7 @@ public class NodeReport {
long lastHeartbeatReceivedTime;
long prevLastLogIndex;
long maxAppliedLogIndex;
+ RelayEntry nextToRelay;
RaftMemberReport(
NodeCharacter character,
@@ -90,7 +92,8 @@ public class NodeReport {
boolean isReadOnly,
long lastHeartbeatReceivedTime,
long prevLastLogIndex,
- long maxAppliedLogIndex) {
+ long maxAppliedLogIndex,
+ RelayEntry nextToRelay) {
this.character = character;
this.leader = leader;
this.term = term;
@@ -102,6 +105,7 @@ public class NodeReport {
this.lastHeartbeatReceivedTime = lastHeartbeatReceivedTime;
this.prevLastLogIndex = prevLastLogIndex;
this.maxAppliedLogIndex = maxAppliedLogIndex;
+ this.nextToRelay = nextToRelay;
}
}
@@ -119,7 +123,8 @@ public class NodeReport {
boolean isReadOnly,
long lastHeartbeatReceivedTime,
long prevLastLogIndex,
- long maxAppliedLogIndex) {
+ long maxAppliedLogIndex,
+ RelayEntry nextToRelay) {
super(
character,
leader,
@@ -131,7 +136,8 @@ public class NodeReport {
isReadOnly,
lastHeartbeatReceivedTime,
prevLastLogIndex,
- maxAppliedLogIndex);
+ maxAppliedLogIndex,
+ nextToRelay);
}
@Override
@@ -212,7 +218,8 @@ public class NodeReport {
long headerLatency,
long lastHeartbeatReceivedTime,
long prevLastLogIndex,
- long maxAppliedLogIndex) {
+ long maxAppliedLogIndex,
+ RelayEntry nextToRelay) {
super(
character,
leader,
@@ -224,7 +231,8 @@ public class NodeReport {
isReadOnly,
lastHeartbeatReceivedTime,
prevLastLogIndex,
- maxAppliedLogIndex);
+ maxAppliedLogIndex,
+ nextToRelay);
this.header = header;
this.headerLatency = headerLatency;
}
@@ -254,6 +262,8 @@ public class NodeReport {
+ maxAppliedLogIndex
+ ", readOnly="
+ isReadOnly
+ + ", nextToRelay="
+ + nextToRelay
+ ", headerLatency="
+ headerLatency
+ "ns"
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
index 4524667b9d..395048eb80 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.server.monitor;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
/** NodeStatus contains the last-known spec and load of a node in the cluster. */
@SuppressWarnings("java:S1135")
@@ -53,6 +54,9 @@ public class NodeStatus implements Comparable<NodeStatus> {
// its lastDeactivatedTime is too old.
private long lastDeactivatedTime;
+ private AtomicLong sendEntryNum = new AtomicLong();
+ private AtomicLong sendEntryLatencySum = new AtomicLong();
+
// TODO-Cluster: decide what should be contained in NodeStatus and how two compare two NodeStatus
@Override
public int compareTo(NodeStatus o) {
@@ -115,4 +119,34 @@ public class NodeStatus implements Comparable<NodeStatus> {
return isActivated
|| (System.currentTimeMillis() - lastDeactivatedTime) > DEACTIVATION_VALID_INTERVAL_MS;
}
+
+ public AtomicLong getSendEntryNum() {
+ return sendEntryNum;
+ }
+
+ public AtomicLong getSendEntryLatencySum() {
+ return sendEntryLatencySum;
+ }
+
+ @Override
+ public String toString() {
+ return "NodeStatus{"
+ + "status="
+ + status
+ + ", lastUpdateTime="
+ + lastUpdateTime
+ + ", lastResponseLatency="
+ + lastResponseLatency
+ + ", isActivated="
+ + isActivated
+ + ", lastDeactivatedTime="
+ + lastDeactivatedTime
+ + ", sendEntryNum="
+ + sendEntryNum
+ + ", sendEntryLatencySum="
+ + sendEntryLatencySum
+ + ", sendEntryLatencyAvg="
+ + (sendEntryLatencySum.get() * 1.0 / sendEntryNum.get())
+ + '}';
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index 6989bf97a0..eb79a84d2a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.ConnectException;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -46,7 +47,7 @@ public class NodeStatusManager {
private static final Logger logger = LoggerFactory.getLogger(NodeStatusManager.class);
// a status is considered stale if it is older than one minute and should be updated
- private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 60 * 1000L;
+ private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 1 * 10L;
private static final NodeStatusManager INSTANCE = new NodeStatusManager();
private MetaGroupMember metaGroupMember;
@@ -147,7 +148,7 @@ public class NodeStatusManager {
} else {
nodeStatus.setLastResponseLatency(Long.MAX_VALUE);
}
- logger.info(
+ logger.debug(
"NodeStatus of {} is updated, status: {}, response time: {}",
node,
nodeStatus.getStatus(),
@@ -180,4 +181,8 @@ public class NodeStatusManager {
public boolean isActivated(Node node) {
return getNodeStatus(node, false).isActivated();
}
+
+ public Map<Node, NodeStatus> getNodeStatusMap() {
+ return Collections.unmodifiableMap(nodeStatusMap);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 4169935f64..13b8ec7bc3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -146,6 +146,16 @@ public class Timer {
RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
RAFT_SENDER_SEND_LOG(
RAFT_MEMBER_SENDER, "send log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+ RAFT_SENDER_HANDLE_SEND_RESULT(
+ RAFT_MEMBER_SENDER,
+ "handle send log result",
+ TIME_SCALE,
+ true,
+ RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+ RAFT_SENDER_RELAY_OFFER_LOG(
+ RAFT_MEMBER_SENDER, "relay offer log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+ RAFT_SENDER_RELAY_LOG(
+ RAFT_MEMBER_SENDER, "relay log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
RAFT_SENDER_VOTE_COUNTER(
RAFT_MEMBER_SENDER,
"wait for votes",
@@ -237,14 +247,26 @@ public class Timer {
RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
RAFT_RECEIVER_APPEND_ENTRY(
RAFT_MEMBER_RECEIVER, "append entrys", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
- RAFT_RECEIVER_INDEX_DIFF(RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
- // log dispatcher
- LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE(
- LOG_DISPATCHER,
- "from create to queue",
+ RAFT_RECEIVER_APPEND_ACK(
+ RAFT_MEMBER_RECEIVER,
+ "ack append entrys",
TIME_SCALE,
true,
- META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+ RAFT_RECEIVER_APPEND_ENTRY_FULL(
+ RAFT_MEMBER_RECEIVER,
+ "append entrys(full)",
+ TIME_SCALE,
+ true,
+ RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+ RAFT_RECEIVER_HANDLE_APPEND_ACK(
+ RAFT_MEMBER_SENDER,
+ "handle append entrys ack",
+ TIME_SCALE,
+ true,
+ RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+ RAFT_RECEIVER_INDEX_DIFF(RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
+ // log dispatcher
LOG_DISPATCHER_LOG_ENQUEUE(
LOG_DISPATCHER,
"enqueue",
@@ -265,6 +287,24 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE(
+ LOG_DISPATCHER,
+ "from create to queue",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE(
+ LOG_DISPATCHER,
+ "from create to dequeue",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_SENDING(
+ LOG_DISPATCHER,
+ "from create to sending",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
LOG_DISPATCHER_FROM_CREATE_TO_SENT(
LOG_DISPATCHER,
"from create to sent",
@@ -288,7 +328,9 @@ public class Timer {
RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
- RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT);
+ RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
+ RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT),
+ RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, ROOT);
String className;
String blockName;
@@ -335,11 +377,13 @@ public class Timer {
* This method equals `add(System.nanoTime() - start)`. We wrap `System.nanoTime()` in this
* method to avoid unnecessary calls when instrumenting is disabled.
*/
- public void calOperationCostTimeFromStart(long startTime) {
+ public long calOperationCostTimeFromStart(long startTime) {
if (ENABLE_INSTRUMENTING && startTime != Long.MIN_VALUE && startTime != 0) {
long consumed = System.nanoTime() - startTime;
add(consumed);
+ return consumed;
}
+ return 0;
}
/** WARN: no current safety guarantee. */
@@ -363,6 +407,10 @@ public class Timer {
double avg = s / cnt;
return String.format("%s - %s: %.2f, %d, %.2f, %d", className, blockName, s, cnt, avg, max);
}
+
+ public long getCnt() {
+ return counter.get();
+ }
}
public static String getReport() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 7a6eb2c0c4..486c50959e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.ClusterUtils;
@@ -160,7 +161,9 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
*/
@Override
public TNodeStatus queryNodeStatus() {
- return new TNodeStatus();
+ return new TNodeStatus()
+ .setFanoutRequestNum(
+ Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_SEND_RELAY.getCnt());
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/WeightedList.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WeightedList.java
new file mode 100644
index 0000000000..b2b94aa2e0
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WeightedList.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.utils;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class WeightedList<T> {
+ private List<Pair<T, Double>> elements = new ArrayList<>();
+ private Random random = new Random();
+ private double[] probabilities;
+ private double weightSum = 0.0;
+
+ public void insert(T t, Double weight) {
+ elements.add(new Pair<>(t, weight));
+ weightSum += weight;
+ }
+
+ public List<T> select(int num) {
+ List<T> rst = new ArrayList<>(num);
+ if (num >= elements.size()) {
+ for (Pair<T, Double> element : elements) {
+ rst.add(element.left);
+ }
+ elements.clear();
+ weightSum = 0.0;
+ } else {
+ for (int i = 0; i < num; i++) {
+ rst.add(select());
+ }
+ }
+
+ return rst;
+ }
+
+ public T select() {
+ if (elements.isEmpty()) {
+ return null;
+ }
+
+ if (probabilities == null || probabilities.length < elements.size()) {
+ probabilities = new double[elements.size()];
+ }
+
+ probabilities[0] = elements.get(0).right / weightSum;
+ for (int i = 1; i < elements.size(); i++) {
+ probabilities[i] = elements.get(i).right / weightSum + probabilities[i - 1];
+ }
+
+ double p = random.nextDouble();
+ int selectedIndex = elements.size() - 1;
+ for (int i = 0; i < elements.size() - 1; i++) {
+ if (p <= probabilities[i]) {
+ selectedIndex = i;
+ break;
+ }
+ }
+
+ Pair<T, Double> rst = elements.remove(selectedIndex);
+ weightSum -= rst.right;
+ return rst.left;
+ }
+
+ public int size() {
+ return elements.size();
+ }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 77111cb0f3..3c82a4241a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -78,8 +78,7 @@ public class AppendNodeEntryHandlerTest {
handler.setLog(votingLog);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
- handler.setReceiver(TestUtils.getNode(i));
- handler.setPeer(peer);
+ handler.setDirectReceiver(TestUtils.getNode(i));
handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
long resp = i >= 5 ? Response.RESPONSE_AGREE : Response.RESPONSE_LOG_MISMATCH;
AppendEntryResult result = new AppendEntryResult();
@@ -114,8 +113,7 @@ public class AppendNodeEntryHandlerTest {
handler.setLog(votingLog);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
- handler.setReceiver(TestUtils.getNode(i));
- handler.setPeer(peer);
+ handler.setDirectReceiver(TestUtils.getNode(i));
handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
AppendEntryResult result = new AppendEntryResult();
result.setStatus(Response.RESPONSE_AGREE);
@@ -141,8 +139,7 @@ public class AppendNodeEntryHandlerTest {
handler.setLog(votingLog);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
- handler.setReceiver(TestUtils.getNode(0));
- handler.setPeer(peer);
+ handler.setDirectReceiver(TestUtils.getNode(0));
handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
new Thread(() -> handler.onComplete(new AppendEntryResult(100L))).start();
votingLog.wait();
@@ -168,8 +165,7 @@ public class AppendNodeEntryHandlerTest {
handler.setLog(votingLog);
handler.setMember(member);
handler.setReceiverTerm(receiverTerm);
- handler.setReceiver(TestUtils.getNode(0));
- handler.setPeer(peer);
+ handler.setDirectReceiver(TestUtils.getNode(0));
handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
handler.onError(new TestException());
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index ea8edf73d9..f627c13621 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -235,7 +235,7 @@ struct PreviousFillRequest {
// the spec and load of a node, for query coordinating
struct TNodeStatus {
-
+ 1: required long fanoutRequestNum
}
struct GetAggrResultRequest {
@@ -283,6 +283,7 @@ struct AppendEntryResult {
2: optional i64 lastLogTerm;
3: optional i64 lastLogIndex;
4: optional RaftNode header;
+ 5: optional Node receiver;
}