You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/11/15 03:08:44 UTC
[iotdb] branch multi_leader_memory_pendingBatch_control updated: enhance TLogBatch struct
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/multi_leader_memory_pendingBatch_control by this push:
new 66e50b85fb enhance TLogBatch struct
66e50b85fb is described below
commit 66e50b85fbf3111ace1913b9ceb802b40e769338
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Tue Nov 15 11:08:30 2022 +0800
enhance TLogBatch struct
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
.../common/request/IndexedConsensusRequest.java | 15 +++++------
.../multileader/logdispatcher/LogDispatcher.java | 16 +++---------
.../multileader/logdispatcher/PendingBatch.java | 4 ++-
.../service/MultiLeaderRPCServiceProcessor.java | 30 ++++++----------------
.../src/main/thrift/mutlileader.thrift | 2 +-
5 files changed, 22 insertions(+), 45 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index ca236da1c3..6b0cb2634d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -39,6 +39,12 @@ public class IndexedConsensusRequest implements IConsensusRequest {
this.searchIndex = searchIndex;
this.requests = requests;
this.syncIndex = -1L;
+ this.requests.forEach(
+ r -> {
+ ByteBuffer buffer = r.serializeToByteBuffer();
+ this.serializedRequests.add(buffer);
+ this.serializedSize += buffer.capacity();
+ });
}
public IndexedConsensusRequest(
@@ -61,15 +67,6 @@ public class IndexedConsensusRequest implements IConsensusRequest {
return serializedRequests;
}
- public void buildSerializedRequests() {
- this.requests.forEach(
- r -> {
- ByteBuffer buffer = r.serializeToByteBuffer();
- this.serializedRequests.add(buffer);
- this.serializedSize += buffer.capacity();
- });
- }
-
public long getSerializedSize() {
return serializedSize;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 8c7a443d3c..f0b4791ea8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -44,7 +43,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -156,9 +154,6 @@ public class LogDispatcher {
}
public void offer(IndexedConsensusRequest request) {
- request.buildSerializedRequests();
- // we put the serialization step outside the synchronized block because it is stateless and
- // time-consuming
synchronized (this) {
threads.forEach(
thread -> {
@@ -500,18 +495,15 @@ public class LogDispatcher {
}
targetIndex = data.getSearchIndex() + 1;
// construct request from wal
- for (IConsensusRequest innerRequest : data.getRequests()) {
- logBatches.addTLogBatch(
- new TLogBatch(innerRequest.serializeToByteBuffer(), data.getSearchIndex(), true));
- }
+ logBatches.addTLogBatch(
+ new TLogBatch(data.getSerializedRequests(), data.getSearchIndex(), true));
}
}
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, PendingBatch logBatches) {
- for (ByteBuffer innerRequest : request.getSerializedRequests()) {
- logBatches.addTLogBatch(new TLogBatch(innerRequest, request.getSearchIndex(), false));
- }
+ logBatches.addTLogBatch(
+ new TLogBatch(request.getSerializedRequests(), request.getSearchIndex(), false));
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
index 5d4685b19e..dbfb52d515 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.multileader.logdispatcher;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
+import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;
@@ -55,7 +56,8 @@ public class PendingBatch {
public void addTLogBatch(TLogBatch batch) {
batches.add(batch);
// TODO Maybe we need to add in additional fields for more accurate calculations
- serializedSize += batch.getData() == null ? 0 : batch.getData().length;
+ serializedSize +=
+ batch.getData() == null ? 0 : batch.getData().stream().mapToInt(Buffer::capacity).sum();
}
public boolean canAccumulate() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 5f90cfffb5..81fc62d6f5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
@@ -56,9 +55,8 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
+import java.util.stream.Collectors;
public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
@@ -105,28 +103,16 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
new BatchIndexedConsensusRequest(req.peerId);
// We use synchronized to ensure atomicity of executing multiple logs
if (!req.getBatches().isEmpty()) {
- List<IConsensusRequest> consensusRequests = new ArrayList<>();
- long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
for (TLogBatch batch : req.getBatches()) {
- IConsensusRequest request =
- batch.isFromWAL()
- ? new MultiLeaderConsensusRequest(batch.data)
- : new ByteBufferConsensusRequest(batch.data);
- // merge TLogBatch with same search index into one request
- if (batch.getSearchIndex() != currentSearchIndex) {
- requestsInThisBatch.add(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- currentSearchIndex, consensusRequests));
- consensusRequests = new ArrayList<>();
- currentSearchIndex = batch.getSearchIndex();
- }
- consensusRequests.add(request);
- }
- // write last request
- if (!consensusRequests.isEmpty()) {
requestsInThisBatch.add(
impl.buildIndexedConsensusRequestForRemoteRequest(
- currentSearchIndex, consensusRequests));
+ batch.getSearchIndex(),
+ batch.getData().stream()
+ .map(
+ batch.isFromWAL()
+ ? MultiLeaderConsensusRequest::new
+ : ByteBufferConsensusRequest::new)
+ .collect(Collectors.toList())));
}
}
TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 0336c19303..28e9047178 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -21,7 +21,7 @@ include "common.thrift"
namespace java org.apache.iotdb.consensus.multileader.thrift
struct TLogBatch {
- 1: required binary data
+ 1: required list<binary> data
2: required i64 searchIndex
3: required bool fromWAL
}