You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/03/04 13:52:37 UTC
[iotdb] 01/01: [IOTDB-5613] Remove unnecessary serialization in IoTConsensus when replicaNum is 1 to improve write performance (#9204)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch jira5613
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3d03d468ea5853ab325f9390075c0f8e57814db1
Author: Potato <ta...@apache.org>
AuthorDate: Fri Mar 3 23:13:34 2023 +0800
[IOTDB-5613] Remove unnecessary serialization in IoTConsensus when replicaNum is 1 to improve write performance (#9204)
* finish
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
* remove unused code
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
* add wal & UT buildSerializedRequest
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
* fix UT
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---------
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
.../common/request/IndexedConsensusRequest.java | 19 ++++++++-----
.../consensus/iot/IoTConsensusServerImpl.java | 5 ++--
.../consensus/iot/logdispatcher/LogDispatcher.java | 33 +++++++++++++---------
.../consensus/iot/util/FakeConsensusReqReader.java | 4 ++-
4 files changed, 36 insertions(+), 25 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 0be89ce1be..58789dbd0e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -32,19 +32,14 @@ public class IndexedConsensusRequest implements IConsensusRequest {
private final long syncIndex;
private final List<IConsensusRequest> requests;
- private final List<ByteBuffer> serializedRequests = new ArrayList<>();
+ private final List<ByteBuffer> serializedRequests;
private long serializedSize = 0;
public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
this.searchIndex = searchIndex;
this.requests = requests;
this.syncIndex = -1L;
- this.requests.forEach(
- r -> {
- ByteBuffer buffer = r.serializeToByteBuffer();
- this.serializedRequests.add(buffer);
- this.serializedSize += buffer.capacity();
- });
+ this.serializedRequests = new ArrayList<>(requests.size());
}
public IndexedConsensusRequest(
@@ -52,6 +47,16 @@ public class IndexedConsensusRequest implements IConsensusRequest {
this.searchIndex = searchIndex;
this.requests = requests;
this.syncIndex = syncIndex;
+ this.serializedRequests = new ArrayList<>(requests.size());
+ }
+
+ public void buildSerializedRequests() {
+ this.requests.forEach(
+ r -> {
+ ByteBuffer buffer = r.serializeToByteBuffer();
+ this.serializedRequests.add(buffer);
+ this.serializedSize += buffer.capacity();
+ });
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 2ab9746a31..16e378cde4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -199,8 +199,6 @@ public class IoTConsensusServerImpl {
}
}
long writeToStateMachineStartTime = System.nanoTime();
- IndexedConsensusRequest indexedConsensusRequest =
- buildIndexedConsensusRequestForLocalRequest(request);
// statistic the time of checking write block
MetricService.getInstance()
.getOrCreateHistogram(
@@ -213,6 +211,8 @@ public class IoTConsensusServerImpl {
Tag.REGION.toString(),
this.consensusGroupId)
.update(writeToStateMachineStartTime - getStateMachineLockTime);
+ IndexedConsensusRequest indexedConsensusRequest =
+ buildIndexedConsensusRequestForLocalRequest(request);
if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
logger.info(
"DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}",
@@ -220,7 +220,6 @@ public class IoTConsensusServerImpl {
getCurrentSafelyDeletedSearchIndex(),
indexedConsensusRequest.getSearchIndex());
}
- // TODO wal and memtable
IConsensusRequest planNode = stateMachine.deserializeRequest(indexedConsensusRequest);
TSStatus result = stateMachine.write(planNode);
long writeToStateMachineEndTime = System.nanoTime();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 68a988558e..f6da0efd52 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -156,22 +156,26 @@ public class LogDispatcher {
}
public void offer(IndexedConsensusRequest request) {
- synchronized (this) {
- threads.forEach(
- thread -> {
- logger.debug(
- "{}->{}: Push a log to the queue, where the queue length is {}",
- impl.getThisNode().getGroupId(),
- thread.getPeer().getEndpoint().getIp(),
- thread.getPendingEntriesSize());
- if (!thread.offer(request)) {
+ // we don't need to serialize and offer request when replicaNum is 1.
+ if (!threads.isEmpty()) {
+ request.buildSerializedRequests();
+ synchronized (this) {
+ threads.forEach(
+ thread -> {
logger.debug(
- "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+ "{}->{}: Push a log to the queue, where the queue length is {}",
impl.getThisNode().getGroupId(),
- thread.getPeer(),
- request.getSearchIndex());
- }
- });
+ thread.getPeer().getEndpoint().getIp(),
+ thread.getPendingEntriesSize());
+ if (!thread.offer(request)) {
+ logger.debug(
+ "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+ impl.getThisNode().getGroupId(),
+ thread.getPeer(),
+ request.getSearchIndex());
+ }
+ });
+ }
}
}
@@ -510,6 +514,7 @@ public class LogDispatcher {
}
}
targetIndex = data.getSearchIndex() + 1;
+ data.buildSerializedRequests();
// construct request from wal
logBatches.addTLogEntry(
new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), true));
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
index c2560201a5..3a8c8e4e8c 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
@@ -53,6 +53,7 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
}
private class FakeConsensusReqIterator implements ConsensusReqReader.ReqIterator {
+
private long nextSearchIndex;
public FakeConsensusReqIterator(long startIndex) {
@@ -70,7 +71,8 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
for (IndexedConsensusRequest indexedConsensusRequest : requestSets.getRequestSet()) {
if (indexedConsensusRequest.getSearchIndex() == nextSearchIndex) {
nextSearchIndex++;
- return indexedConsensusRequest;
+ return new IndexedConsensusRequest(
+ indexedConsensusRequest.getSearchIndex(), indexedConsensusRequest.getRequests());
}
}
return null;