You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/11/15 07:32:50 UTC
[iotdb] 01/01: add logs
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch multi_leader_duplicate_entry_debug
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 79ea36bdbf39f58f491c9b4ebc73928b320f5d5f
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Tue Nov 15 15:32:28 2022 +0800
add logs
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
.../multileader/logdispatcher/LogDispatcher.java | 27 ++++++++++++++++++++--
.../service/MultiLeaderRPCServiceProcessor.java | 8 +++++++
2 files changed, 33 insertions(+), 2 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index f0b4791ea8..5aea9d6c7a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -396,6 +396,16 @@ public class LogDispatcher {
return batches;
}
+ if (!batches.isEmpty()
+ && batches.getBatches().get(batches.getBatches().size() - 1).getSearchIndex()
+ == prev.getSearchIndex()) {
+ logger.error(
+ "{} : encounter duplicated request {} for pendingBatch {} from wal and queue",
+ impl.getThisNode().getGroupId(),
+ prev,
+ batches);
+ }
+
constructBatchIndexedFromConsensusRequest(prev, batches);
iterator.remove();
releaseReservedMemory(prev);
@@ -421,6 +431,14 @@ public class LogDispatcher {
return batches;
}
}
+ if (batches.getBatches().get(batches.getBatches().size() - 1).getSearchIndex()
+ == current.getSearchIndex()) {
+ logger.error(
+ "{} : encounter duplicated request {} for pendingBatch {} from wal and queue",
+ impl.getThisNode().getGroupId(),
+ current,
+ batches);
+ }
constructBatchIndexedFromConsensusRequest(current, batches);
prev = current;
// We might not be able to remove all the elements in the bufferedRequest in the
@@ -478,18 +496,23 @@ public class LogDispatcher {
IndexedConsensusRequest data = walEntryIterator.next();
if (data.getSearchIndex() < targetIndex) {
// if the index of request is smaller than currentIndex, then continue
- logger.warn(
+ logger.error(
"search for one Entry which index is {}, but find a smaller one, index : {}",
targetIndex,
data.getSearchIndex());
continue;
} else if (data.getSearchIndex() > targetIndex) {
- logger.warn(
+ logger.error(
"search for one Entry which index is {}, but find a larger one, index : {}",
targetIndex,
data.getSearchIndex());
if (data.getSearchIndex() >= maxIndex) {
// if the index of request is larger than maxIndex, then finish
+ logger.error(
+ "break constructBatchFromWAL({}, {}) when encounter request {}",
+ currentIndex,
+ maxIndex,
+ data);
break;
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 67bceadd12..1f9f80584c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -102,6 +102,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
BatchIndexedConsensusRequest requestsInThisBatch =
new BatchIndexedConsensusRequest(req.peerId);
// We use synchronized to ensure atomicity of executing multiple logs
+ long searchIndex = -1;
for (TLogBatch batch : req.getBatches()) {
requestsInThisBatch.add(
impl.buildIndexedConsensusRequestForRemoteRequest(
@@ -112,6 +113,13 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
? MultiLeaderConsensusRequest::new
: ByteBufferConsensusRequest::new)
.collect(Collectors.toList())));
+ if (searchIndex != -1 && searchIndex == batch.searchIndex) {
+ logger.error(
+ "execute TSyncLogReq for {} with duplicated searchIndex {}",
+ req.consensusGroupId,
+ req);
+ }
+ searchIndex = batch.getSearchIndex();
}
TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
logger.debug(