You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/02 09:12:29 UTC
[iotdb] branch ml_0729_test_exp1 updated: remove serialization operation before dispatch from queue
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test_exp1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0729_test_exp1 by this push:
new 4ccd7e9cc1 remove serialization operation before dispatch from queue
4ccd7e9cc1 is described below
commit 4ccd7e9cc1ce974f6011bdea08e4b00405d922af
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 2 17:12:18 2022 +0800
remove serialization operation before dispatch from queue
---
.../common/request/IndexedConsensusRequest.java | 20 +++++++++++++++++++-
.../iotdb/consensus/config/MultiLeaderConfig.java | 2 +-
.../multileader/logdispatcher/LogDispatcher.java | 15 +++++++++++----
3 files changed, 31 insertions(+), 6 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 9c5112fa2b..f90dc6d86b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.consensus.common.request;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@@ -30,7 +31,8 @@ public class IndexedConsensusRequest implements IConsensusRequest {
private final long searchIndex;
private final long syncIndex;
- private final List<IConsensusRequest> requests;
+ private List<IConsensusRequest> requests;
+ private List<ByteBuffer> serializedRequests;
public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
this.searchIndex = searchIndex;
@@ -38,6 +40,12 @@ public class IndexedConsensusRequest implements IConsensusRequest {
this.syncIndex = -1L;
}
+ public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) {
+ this.searchIndex = searchIndex;
+ this.serializedRequests = serializedRequests;
+ this.syncIndex = -1L;
+ }
+
public IndexedConsensusRequest(
long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
this.searchIndex = searchIndex;
@@ -54,6 +62,16 @@ public class IndexedConsensusRequest implements IConsensusRequest {
return requests;
}
+ public List<ByteBuffer> getSerializedRequests() {
+ return serializedRequests;
+ }
+
+ public List<ByteBuffer> buildSerializedRequests() {
+ List<ByteBuffer> result = new LinkedList<>();
+ this.requests.forEach(r -> this.serializedRequests.add(r.serializeToByteBuffer()));
+ return result;
+ }
+
public long getSearchIndex() {
return searchIndex;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index fa6b9db6a5..1a9b4dac9f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -247,7 +247,7 @@ public class MultiLeaderConfig {
}
public static class Builder {
- private int maxPendingRequestNumPerNode = 1000;
+ private int maxPendingRequestNumPerNode = 200;
private int maxRequestPerBatch = 40;
private int maxPendingBatch = 5;
private int maxWaitingTimeForAccumulatingBatchInMs = 500;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ed41ec5e16..ab35102254 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@@ -112,7 +113,14 @@ public class LogDispatcher {
"{}: Push a log to the queue, where the queue length is {}",
impl.getThisNode().getGroupId(),
thread.getPendingRequest().size());
- if (!thread.getPendingRequest().offer(request)) {
+ if (thread.getPendingRequest().size()
+ < thread.config.getReplication().getMaxPendingRequestNumPerNode()) {
+ thread
+ .getPendingRequest()
+ .add(
+ new IndexedConsensusRequest(
+ request.buildSerializedRequests(), request.getSearchIndex()));
+ } else {
logger.debug(
"{}: Log queue of {} is full, ignore the log to this node",
impl.getThisNode().getGroupId(),
@@ -355,9 +363,8 @@ public class LogDispatcher {
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, List<TLogBatch> logBatches) {
- for (IConsensusRequest innerRequest : request.getRequests()) {
- logBatches.add(
- new TLogBatch(innerRequest.serializeToByteBuffer(), request.getSearchIndex(), false));
+ for (ByteBuffer innerRequest : request.getSerializedRequests()) {
+ logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false));
}
}
}