You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/08/11 04:08:50 UTC
[iotdb] branch master updated: Optimize MultileaderConsensus SyncLog (#6936)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3033146fa0 Optimize MultileaderConsensus SyncLog (#6936)
3033146fa0 is described below
commit 3033146fa01ce158efadf825787bc6b8d569892c
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Thu Aug 11 12:08:45 2022 +0800
Optimize MultileaderConsensus SyncLog (#6936)
Co-authored-by: ZhangHongYin <46...@users.noreply.github.com>
---
...uest.java => BatchIndexedConsensusRequest.java} | 53 ++++---
.../common/request/IndexedConsensusRequest.java | 33 ++++-
.../iotdb/consensus/config/MultiLeaderConfig.java | 8 +-
.../multileader/MultiLeaderServerImpl.java | 24 +++-
.../multileader/logdispatcher/LogDispatcher.java | 39 +++--
.../service/MultiLeaderRPCServiceProcessor.java | 52 +++----
.../multileader/logdispatcher/SyncStatusTest.java | 4 +-
.../multileader/util/TestStateMachine.java | 40 ++++--
.../org/apache/iotdb/commons}/StepTracker.java | 29 +++-
.../statemachine/DataRegionStateMachine.java | 159 +++++++++++++++++++--
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 10 +-
11 files changed, 350 insertions(+), 101 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
similarity index 50%
copy from consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
copy to consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
index de3aca433b..a4a38fc84f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
@@ -20,49 +20,44 @@
package org.apache.iotdb.consensus.common.request;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Objects;
-/** only used for multi-leader consensus. */
-public class IndexedConsensusRequest implements IConsensusRequest {
+public class BatchIndexedConsensusRequest implements IConsensusRequest {
- /** we do not need to serialize these two fields as they are useless in other nodes. */
- private final long searchIndex;
+ private boolean isStartSyncIndexInitialized;
+ private long startSyncIndex;
+ private long endSyncIndex;
+ private final List<IndexedConsensusRequest> requests;
- private final List<IConsensusRequest> requests;
-
- public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
- this.searchIndex = searchIndex;
- this.requests = requests;
+ public BatchIndexedConsensusRequest() {
+ this.requests = new LinkedList<>();
+ this.isStartSyncIndexInitialized = false;
}
- @Override
- public ByteBuffer serializeToByteBuffer() {
- throw new UnsupportedOperationException();
+ public void add(IndexedConsensusRequest request) {
+ if (!isStartSyncIndexInitialized) {
+ startSyncIndex = request.getSyncIndex();
+ isStartSyncIndexInitialized = true;
+ }
+ endSyncIndex = request.getSyncIndex();
+ this.requests.add(request);
}
- public List<IConsensusRequest> getRequests() {
- return requests;
+ public long getStartSyncIndex() {
+ return startSyncIndex;
}
- public long getSearchIndex() {
- return searchIndex;
+ public long getEndSyncIndex() {
+ return endSyncIndex;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- IndexedConsensusRequest that = (IndexedConsensusRequest) o;
- return searchIndex == that.searchIndex && requests.equals(that.requests);
+ public List<IndexedConsensusRequest> getRequests() {
+ return requests;
}
@Override
- public int hashCode() {
- return Objects.hash(searchIndex, requests);
+ public ByteBuffer serializeToByteBuffer() {
+ return null;
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index de3aca433b..1a61cf3a09 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.consensus.common.request;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@@ -29,11 +30,27 @@ public class IndexedConsensusRequest implements IConsensusRequest {
/** we do not need to serialize these two fields as they are useless in other nodes. */
private final long searchIndex;
- private final List<IConsensusRequest> requests;
+ private final long syncIndex;
+ private List<IConsensusRequest> requests;
+ private List<ByteBuffer> serializedRequests;
public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
this.searchIndex = searchIndex;
this.requests = requests;
+ this.syncIndex = -1L;
+ }
+
+ public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) {
+ this.searchIndex = searchIndex;
+ this.serializedRequests = serializedRequests;
+ this.syncIndex = -1L;
+ }
+
+ public IndexedConsensusRequest(
+ long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
+ this.searchIndex = searchIndex;
+ this.requests = requests;
+ this.syncIndex = syncIndex;
}
@Override
@@ -45,10 +62,24 @@ public class IndexedConsensusRequest implements IConsensusRequest {
return requests;
}
+ public List<ByteBuffer> getSerializedRequests() {
+ return serializedRequests;
+ }
+
+ public List<ByteBuffer> buildSerializedRequests() {
+ List<ByteBuffer> result = new LinkedList<>();
+ this.requests.forEach(r -> result.add(r.serializeToByteBuffer()));
+ return result;
+ }
+
public long getSearchIndex() {
return searchIndex;
}
+ public long getSyncIndex() {
+ return syncIndex;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index e4f66f557b..9b4a72a4f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -247,9 +247,11 @@ public class MultiLeaderConfig {
}
public static class Builder {
- private int maxPendingRequestNumPerNode = 200;
- private int maxRequestPerBatch = 40;
- private int maxPendingBatch = 1;
+ private int maxPendingRequestNumPerNode = 600;
+ private int maxRequestPerBatch = 30;
+ // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
+ // in DataRegionStateMachine
+ private int maxPendingBatch = 5;
private int maxWaitingTimeForAccumulatingBatchInMs = 500;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 3e8cbbfb3b..6ef1fab4ae 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -122,16 +122,23 @@ public class MultiLeaderServerImpl {
// TODO wal and memtable
TSStatus result = stateMachine.write(indexedConsensusRequest);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- logDispatcher.offer(indexedConsensusRequest);
+ // The index is used when constructing batch in LogDispatcher. If its value
+ // increases but the corresponding request does not exist or is not put into
+ // the queue, the dispatcher will try to find the request in WAL. This behavior
+ // is not expected and will slow down the preparation speed for batch.
+ // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are
+ // in one transaction.
+ synchronized (index) {
+ logDispatcher.offer(indexedConsensusRequest);
+ index.incrementAndGet();
+ }
} else {
logger.debug(
"{}: write operation failed. searchIndex: {}. Code: {}",
thisNode.getGroupId(),
indexedConsensusRequest.getSearchIndex(),
result.getCode());
- index.decrementAndGet();
}
-
return result;
}
}
@@ -182,12 +189,13 @@ public class MultiLeaderServerImpl {
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
- return new IndexedConsensusRequest(index.incrementAndGet(), Collections.singletonList(request));
+ return new IndexedConsensusRequest(index.get() + 1, Collections.singletonList(request));
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
- List<IConsensusRequest> requests) {
- return new IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, requests);
+ long syncIndex, List<IConsensusRequest> requests) {
+ return new IndexedConsensusRequest(
+ ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests);
}
/**
@@ -217,4 +225,8 @@ public class MultiLeaderServerImpl {
public MultiLeaderConfig getConfig() {
return config;
}
+
+ public AtomicLong getIndexObject() {
+ return index;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ed41ec5e16..c918dac459 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@@ -106,17 +107,22 @@ public class LogDispatcher {
}
public void offer(IndexedConsensusRequest request) {
+ List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
threads.forEach(
thread -> {
logger.debug(
- "{}: Push a log to the queue, where the queue length is {}",
+ "{}->{}: Push a log to the queue, where the queue length is {}",
impl.getThisNode().getGroupId(),
+ thread.getPeer().getEndpoint().getIp(),
thread.getPendingRequest().size());
- if (!thread.getPendingRequest().offer(request)) {
+ if (!thread
+ .getPendingRequest()
+ .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
logger.debug(
- "{}: Log queue of {} is full, ignore the log to this node",
+ "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
impl.getThisNode().getGroupId(),
- thread.getPeer());
+ thread.getPeer(),
+ request.getSearchIndex());
}
});
}
@@ -222,6 +228,7 @@ public class LogDispatcher {
PendingBatch batch;
List<TLogBatch> logBatches = new ArrayList<>();
long startIndex = syncStatus.getNextSendingIndex();
+ long maxIndexWhenBufferedRequestEmpty = startIndex;
logger.debug("[GetBatch] startIndex: {}", startIndex);
long endIndex;
if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
@@ -231,9 +238,12 @@ public class LogDispatcher {
impl.getThisNode().getGroupId(),
pendingRequest.size(),
bufferedRequest.size());
- pendingRequest.drainTo(
- bufferedRequest,
- config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
+ synchronized (impl.getIndexObject()) {
+ pendingRequest.drainTo(
+ bufferedRequest,
+ config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
+ maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
+ }
// remove all request that searchIndex < startIndex
Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
while (iterator.hasNext()) {
@@ -245,8 +255,13 @@ public class LogDispatcher {
}
}
}
- if (bufferedRequest.isEmpty()) { // only execute this after a restart
- endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, logBatches);
+ // This condition will be executed in several scenarios:
+ // 1. restart
+ // 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed
+ // up. To prevent inconsistency here, we use the synchronized logic when calculate value of
+ // `maxIndexWhenBufferedRequestEmpty`
+ if (bufferedRequest.isEmpty()) {
+ endIndex = constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, logBatches);
batch = new PendingBatch(startIndex, endIndex, logBatches);
logger.debug(
"{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch);
@@ -327,6 +342,7 @@ public class LogDispatcher {
currentIndex,
maxIndex,
iteratorIndex));
+ // Even if there is no WAL files, these code won't produce error.
if (iteratorIndex != currentIndex) {
walEntryiterator.skipTo(currentIndex);
iteratorIndex = currentIndex;
@@ -355,9 +371,8 @@ public class LogDispatcher {
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, List<TLogBatch> logBatches) {
- for (IConsensusRequest innerRequest : request.getRequests()) {
- logBatches.add(
- new TLogBatch(innerRequest.serializeToByteBuffer(), request.getSearchIndex(), false));
+ for (ByteBuffer innerRequest : request.getSerializedRequests()) {
+ logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false));
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 3fc63c7d99..70a0387c40 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.consensus.multileader.service;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
@@ -67,36 +68,37 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
return;
}
- List<TSStatus> statuses = new ArrayList<>();
+ BatchIndexedConsensusRequest requestsInThisBatch = new BatchIndexedConsensusRequest();
// We use synchronized to ensure atomicity of executing multiple logs
if (!req.getBatches().isEmpty()) {
- synchronized (impl.getStateMachine()) {
- List<IConsensusRequest> consensusRequests = new ArrayList<>();
- long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
- for (TLogBatch batch : req.getBatches()) {
- IConsensusRequest request =
- batch.isFromWAL()
- ? new MultiLeaderConsensusRequest(batch.data)
- : new ByteBufferConsensusRequest(batch.data);
- // merge TLogBatch with same search index into one request
- if (batch.getSearchIndex() != currentSearchIndex) {
- statuses.add(
- impl.getStateMachine()
- .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
- consensusRequests = new ArrayList<>();
- }
- consensusRequests.add(request);
- }
- // write last request
- if (!consensusRequests.isEmpty()) {
- statuses.add(
- impl.getStateMachine()
- .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+ List<IConsensusRequest> consensusRequests = new ArrayList<>();
+ long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
+ for (TLogBatch batch : req.getBatches()) {
+ IConsensusRequest request =
+ batch.isFromWAL()
+ ? new MultiLeaderConsensusRequest(batch.data)
+ : new ByteBufferConsensusRequest(batch.data);
+ // merge TLogBatch with same search index into one request
+ if (batch.getSearchIndex() != currentSearchIndex) {
+ requestsInThisBatch.add(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests));
+ consensusRequests = new ArrayList<>();
+ currentSearchIndex = batch.getSearchIndex();
}
+ consensusRequests.add(request);
+ }
+ // write last request
+ if (!consensusRequests.isEmpty()) {
+ requestsInThisBatch.add(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests));
}
}
- logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
- resultHandler.onComplete(new TSyncLogRes(statuses));
+ TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
+ logger.debug(
+ "Execute TSyncLogReq for {} with result {}", req.consensusGroupId, writeStatus.subStatus);
+ resultHandler.onComplete(new TSyncLogRes(writeStatus.subStatus));
} catch (Exception e) {
resultHandler.onError(e);
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index d243a80e77..db6b03377a 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -138,7 +138,9 @@ public class SyncStatusTest {
i++) {
status.removeBatch(batchList.get(i));
Assert.assertEquals(
- config.getReplication().getMaxPendingBatch() / 2, status.getPendingBatches().size());
+ config.getReplication().getMaxPendingBatch()
+ - config.getReplication().getMaxPendingBatch() / 2,
+ status.getPendingBatches().size());
Assert.assertEquals(
config.getReplication().getMaxPendingBatch(), status.getNextSendingIndex());
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index eab940cf87..648c6282dd 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
@@ -30,6 +31,9 @@ import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -40,6 +44,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
+ private static final Logger logger = LoggerFactory.getLogger(TestStateMachine.class);
private final RequestSets requestSets = new RequestSets(ConcurrentHashMap.newKeySet());
public Set<IndexedConsensusRequest> getRequestSet() {
@@ -68,19 +73,36 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
@Override
public TSStatus write(IConsensusRequest request) {
synchronized (requestSets) {
- IndexedConsensusRequest indexedConsensusRequest = (IndexedConsensusRequest) request;
- List<IConsensusRequest> transformedRequest = new ArrayList<>();
- for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
- ByteBuffer buffer = innerRequest.serializeToByteBuffer();
- transformedRequest.add(new TestEntry(buffer.getInt(), Peer.deserialize(buffer)));
+ if (request instanceof IndexedConsensusRequest) {
+ writeOneRequest((IndexedConsensusRequest) request);
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } else if (request instanceof BatchIndexedConsensusRequest) {
+ BatchIndexedConsensusRequest batchIndexedConsensusRequest =
+ (BatchIndexedConsensusRequest) request;
+ List<TSStatus> subStatus = new ArrayList<>();
+ for (IndexedConsensusRequest innerRequest : batchIndexedConsensusRequest.getRequests()) {
+ writeOneRequest(innerRequest);
+ subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ }
+ return new TSStatus().setSubStatus(subStatus);
+ } else {
+ logger.error("Unknown request: {}", request);
+ return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
}
- requestSets.add(
- new IndexedConsensusRequest(indexedConsensusRequest.getSearchIndex(), transformedRequest),
- indexedConsensusRequest.getSearchIndex() != ConsensusReqReader.DEFAULT_SEARCH_INDEX);
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
}
+ private void writeOneRequest(IndexedConsensusRequest indexedConsensusRequest) {
+ List<IConsensusRequest> transformedRequest = new ArrayList<>();
+ for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
+ ByteBuffer buffer = innerRequest.serializeToByteBuffer();
+ transformedRequest.add(new TestEntry(buffer.getInt(), Peer.deserialize(buffer)));
+ }
+ requestSets.add(
+ new IndexedConsensusRequest(indexedConsensusRequest.getSearchIndex(), transformedRequest),
+ indexedConsensusRequest.getSearchIndex() != ConsensusReqReader.DEFAULT_SEARCH_INDEX);
+ }
+
@Override
public synchronized DataSet read(IConsensusRequest request) {
if (request instanceof GetConsensusReqReaderPlan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java b/node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
index b5e1fc88ad..98ca5a4aab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.commons;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,18 +27,24 @@ import java.util.Map;
class Metric {
private static final Logger logger = LoggerFactory.getLogger(Metric.class);
- private static final int PRINT_RATE = 1000;
+ private static final int DEFAULT_PRINT_RATE = 1000;
public String stepName;
public long invokeCount;
public long totalTime;
public long lastCycleTime;
+ public int printRate;
public Metric(String stepName) {
+ this(stepName, DEFAULT_PRINT_RATE);
+ }
+
+ public Metric(String stepName, int printRate) {
this.stepName = stepName;
this.invokeCount = 0;
this.totalTime = 0;
this.lastCycleTime = 0;
+ this.printRate = printRate;
}
public void trace(long startTime, long endTime) {
@@ -47,7 +53,7 @@ class Metric {
}
public void tryPrint() {
- if (invokeCount % PRINT_RATE == 0) {
+ if (invokeCount % printRate == 0) {
logger.info(
String.format(
"step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, Last%dAVG: %fms",
@@ -56,8 +62,8 @@ class Metric {
invokeCount,
totalTime * 1.0 / 1000000,
totalTime * 1.0 / 1000000 / invokeCount,
- PRINT_RATE,
- (totalTime * 1.0 - lastCycleTime) / 1000000 / PRINT_RATE));
+ printRate,
+ (totalTime * 1.0 - lastCycleTime) / 1000000 / printRate));
lastCycleTime = totalTime;
}
}
@@ -74,7 +80,18 @@ public class StepTracker {
metrics.get().get(stepName).tryPrint();
}
+ public static void trace(String stepName, int printRate, long startTime, long endTime) {
+ if (metrics.get() == null) {
+ metrics.set(new HashMap<>());
+ }
+ metrics
+ .get()
+ .computeIfAbsent(stepName, key -> new Metric(stepName, printRate))
+ .trace(startTime, endTime);
+ metrics.get().get(stepName).tryPrint();
+ }
+
public static void cleanup() {
- metrics.set(null);
+ metrics.remove();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 818a09ab2e..fa7a4aa502 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
@@ -47,7 +48,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -58,8 +65,17 @@ public class DataRegionStateMachine extends BaseStateMachine {
private DataRegion region;
+ private static final int MAX_REQUEST_CACHE_SIZE = 5;
+ private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
+
+ private final Lock queueLock = new ReentrantLock();
+ private final Condition queueSortCondition = queueLock.newCondition();
+ private final PriorityQueue<InsertNodeWrapper> requestCache;
+ private long nextSyncIndex = -1;
+
public DataRegionStateMachine(DataRegion region) {
this.region = region;
+ this.requestCache = new PriorityQueue<>();
}
@Override
@@ -104,6 +120,136 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
}
+ /**
+ * This method is used for write of MultiLeader SyncLog. By this method, we can keep write order
+ * in follower the same as the leader. And besides order insurance, we can make the
+ * deserialization of PlanNode to be concurrent
+ */
+ private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
+ queueLock.lock();
+ try {
+ requestCache.add(insertNodeWrapper);
+ // If the peek is not hold by current thread, it should notify the corresponding thread to
+ // process the peek when the queue is full
+ if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
+ && requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
+ queueSortCondition.signalAll();
+ }
+ while (true) {
+ // If current InsertNode is the next target InsertNode, write it
+ if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
+ requestCache.remove(insertNodeWrapper);
+ nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
+ break;
+ }
+ // If all write thread doesn't hit nextSyncIndex and the heap is full, write
+ // the peek request. This is used to keep the whole write correct when nextSyncIndex
+ // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
+ // There are some cases that nextSyncIndex is not set:
+ // 1. When the system was just started
+ // 2. When some exception occurs during SyncLog
+ if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
+ && requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
+ requestCache.remove();
+ nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
+ break;
+ }
+ try {
+ boolean timeout =
+ !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
+ if (timeout) {
+ // although the timeout is triggered, current thread cannot write its request
+ // if current thread does not hold the peek request. And there should be some
+ // other thread who hold the peek request. In this scenario, current thread
+ // should go into await again and wait until its request becoming peek request
+ if (requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
+ // current thread hold the peek request thus it can write the peek immediately.
+ logger.info(
+ "waiting target request timeout. current index: {}, target index: {}",
+ insertNodeWrapper.getStartSyncIndex(),
+ nextSyncIndex);
+ requestCache.remove(insertNodeWrapper);
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.warn(
+ "current waiting is interrupted. SyncIndex: {}. Exception: {}",
+ insertNodeWrapper.getStartSyncIndex(),
+ e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ logger.debug(
+ "region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
+ region.getDataRegionId(),
+ requestCache.size(),
+ insertNodeWrapper.getStartSyncIndex(),
+ insertNodeWrapper.getEndSyncIndex());
+ List<TSStatus> subStatus = new LinkedList<>();
+ for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+ subStatus.add(write(insertNode));
+ }
+ queueSortCondition.signalAll();
+ return new TSStatus().setSubStatus(subStatus);
+ } finally {
+ queueLock.unlock();
+ }
+ }
+
+ private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
+ private final long startSyncIndex;
+ private final long endSyncIndex;
+ private final List<InsertNode> insertNodes;
+
+ public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
+ this.startSyncIndex = startSyncIndex;
+ this.endSyncIndex = endSyncIndex;
+ this.insertNodes = new LinkedList<>();
+ }
+
+ @Override
+ public int compareTo(InsertNodeWrapper o) {
+ return Long.compare(startSyncIndex, o.startSyncIndex);
+ }
+
+ public void add(InsertNode insertNode) {
+ this.insertNodes.add(insertNode);
+ }
+
+ public long getStartSyncIndex() {
+ return startSyncIndex;
+ }
+
+ public long getEndSyncIndex() {
+ return endSyncIndex;
+ }
+
+ public List<InsertNode> getInsertNodes() {
+ return insertNodes;
+ }
+ }
+
+ private InsertNodeWrapper deserializeAndWrap(BatchIndexedConsensusRequest batchRequest) {
+ InsertNodeWrapper insertNodeWrapper =
+ new InsertNodeWrapper(batchRequest.getStartSyncIndex(), batchRequest.getEndSyncIndex());
+ for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
+ insertNodeWrapper.add(grabInsertNode(indexedRequest));
+ }
+ return insertNodeWrapper;
+ }
+
+ private InsertNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+ List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
+ for (IConsensusRequest req : indexedRequest.getRequests()) {
+ // PlanNode in IndexedConsensusRequest should always be InsertNode
+ InsertNode innerNode = (InsertNode) getPlanNode(req);
+ innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+ insertNodes.add(innerNode);
+ }
+ return mergeInsertNodes(insertNodes);
+ }
+
@Override
public List<File> getSnapshotFiles(File latestSnapshotRootDir) {
// TODO: implement this method
@@ -116,14 +262,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
try {
if (request instanceof IndexedConsensusRequest) {
IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request;
- List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
- for (IConsensusRequest req : indexedRequest.getRequests()) {
- // PlanNode in IndexedConsensusRequest should always be InsertNode
- InsertNode innerNode = (InsertNode) getPlanNode(req);
- innerNode.setSearchIndex(indexedRequest.getSearchIndex());
- insertNodes.add(innerNode);
- }
- planNode = mergeInsertNodes(insertNodes);
+ planNode = grabInsertNode(indexedRequest);
+ } else if (request instanceof BatchIndexedConsensusRequest) {
+ InsertNodeWrapper insertNodeWrapper =
+ deserializeAndWrap((BatchIndexedConsensusRequest) request);
+ return cacheAndInsertLatestNode(insertNodeWrapper);
} else {
planNode = getPlanNode(request);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 1a0a74de22..bf1b1fcdaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -88,6 +88,9 @@ public class WALNode implements IWALNode {
/** no multi-leader consensus, all insert nodes can be safely deleted */
public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MAX_VALUE;
+ /** timeout threshold when waiting for next wal entry */
+ private static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
+
/** unique identifier of this WALNode */
private final String identifier;
/** directory to store this node's files */
@@ -680,7 +683,12 @@ public class WALNode implements IWALNode {
@Override
public void waitForNextReady() throws InterruptedException {
while (!hasNext()) {
- buffer.waitForFlush();
+ boolean timeout =
+ !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
+ if (timeout) {
+ logger.info("timeout when waiting for next WAL entry ready, execute rollWALFile");
+ rollWALFile();
+ }
}
}