You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/03 02:56:45 UTC
[iotdb] branch ml_0729_test_exp1 updated: change to serialize for all write request for multi leader
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test_exp1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0729_test_exp1 by this push:
new d0cf6a5487 change to serialize for all write request for multi leader
d0cf6a5487 is described below
commit d0cf6a5487d373cc4543d593cf695941d6d42eea
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Aug 3 10:56:33 2022 +0800
change to serialize for all write request for multi leader
---
.../multileader/logdispatcher/LogDispatcher.java | 27 +++++++++++++---------
1 file changed, 16 insertions(+), 11 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 6929e27831..e465c16698 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -107,20 +107,17 @@ public class LogDispatcher {
}
public void offer(IndexedConsensusRequest request) {
+ List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
threads.forEach(
thread -> {
logger.debug(
"{}: Push a log to the queue, where the queue length is {}",
impl.getThisNode().getGroupId(),
thread.getPendingRequest().size());
- if (thread.getPendingRequest().size()
- < thread.config.getReplication().getMaxPendingRequestNumPerNode()) {
- thread
- .getPendingRequest()
- .add(
- new IndexedConsensusRequest(
- request.buildSerializedRequests(), request.getSearchIndex()));
- thread.countQueue();
+ if (thread
+ .getPendingRequest()
+ .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
+ thread.countQueue(request.getSearchIndex());
} else {
logger.debug(
"{}: Log queue of {} is full, ignore the log to this node",
@@ -130,6 +127,14 @@ public class LogDispatcher {
});
}
+ private boolean needPutIntoQueue() {
+ return threads.stream()
+ .anyMatch(
+ t ->
+ t.getPendingRequest().size()
+ < t.config.getReplication().getMaxPendingRequestNumPerNode());
+ }
+
public class LogDispatcherThread implements Runnable {
private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
private final MultiLeaderConfig config;
@@ -162,12 +167,12 @@ public class LogDispatcher {
this.walEntryiterator = reader.getReqIterator(iteratorIndex);
}
- public void countQueue() {
+ public void countQueue(long searchIndex) {
this.queueCount++;
logger.info(
String.format(
- "DataRegion[%s]->%s: total request from queue - [%d]",
- peer.getGroupId().getId(), peer.getEndpoint().ip, queueCount));
+ "DataRegion[%s]->%s: total request from queue: [%d], requestIndex: [%d]",
+ peer.getGroupId().getId(), peer.getEndpoint().ip, queueCount, searchIndex));
}
public IndexController getController() {