You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/11/15 07:32:50 UTC

[iotdb] 01/01: add logs

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch multi_leader_duplicate_entry_debug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 79ea36bdbf39f58f491c9b4ebc73928b320f5d5f
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Tue Nov 15 15:32:28 2022 +0800

    add logs
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../multileader/logdispatcher/LogDispatcher.java   | 27 ++++++++++++++++++++--
 .../service/MultiLeaderRPCServiceProcessor.java    |  8 +++++++
 2 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index f0b4791ea8..5aea9d6c7a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -396,6 +396,16 @@ public class LogDispatcher {
           return batches;
         }
 
+        if (!batches.isEmpty()
+            && batches.getBatches().get(batches.getBatches().size() - 1).getSearchIndex()
+                == prev.getSearchIndex()) {
+          logger.error(
+              "{} : encounter duplicated request {} for pendingBatch {} from wal and queue",
+              impl.getThisNode().getGroupId(),
+              prev,
+              batches);
+        }
+
         constructBatchIndexedFromConsensusRequest(prev, batches);
         iterator.remove();
         releaseReservedMemory(prev);
@@ -421,6 +431,14 @@ public class LogDispatcher {
               return batches;
             }
           }
+          if (batches.getBatches().get(batches.getBatches().size() - 1).getSearchIndex()
+              == current.getSearchIndex()) {
+            logger.error(
+                "{} : encounter duplicated request {} for pendingBatch {} from wal and queue",
+                impl.getThisNode().getGroupId(),
+                current,
+                batches);
+          }
           constructBatchIndexedFromConsensusRequest(current, batches);
           prev = current;
           // We might not be able to remove all the elements in the bufferedRequest in the
@@ -478,18 +496,23 @@ public class LogDispatcher {
         IndexedConsensusRequest data = walEntryIterator.next();
         if (data.getSearchIndex() < targetIndex) {
           // if the index of request is smaller than currentIndex, then continue
-          logger.warn(
+          logger.error(
               "search for one Entry which index is {}, but find a smaller one, index : {}",
               targetIndex,
               data.getSearchIndex());
           continue;
         } else if (data.getSearchIndex() > targetIndex) {
-          logger.warn(
+          logger.error(
               "search for one Entry which index is {}, but find a larger one, index : {}",
               targetIndex,
               data.getSearchIndex());
           if (data.getSearchIndex() >= maxIndex) {
             // if the index of request is larger than maxIndex, then finish
+            logger.error(
+                "break constructBatchFromWAL({}, {}) when encounter request {}",
+                currentIndex,
+                maxIndex,
+                data);
             break;
           }
         }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 67bceadd12..1f9f80584c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -102,6 +102,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
       BatchIndexedConsensusRequest requestsInThisBatch =
           new BatchIndexedConsensusRequest(req.peerId);
       // We use synchronized to ensure atomicity of executing multiple logs
+      long searchIndex = -1;
       for (TLogBatch batch : req.getBatches()) {
         requestsInThisBatch.add(
             impl.buildIndexedConsensusRequestForRemoteRequest(
@@ -112,6 +113,13 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
                             ? MultiLeaderConsensusRequest::new
                             : ByteBufferConsensusRequest::new)
                     .collect(Collectors.toList())));
+        if (searchIndex != -1 && searchIndex == batch.searchIndex) {
+          logger.error(
+              "execute TSyncLogReq for {} with duplicated searchIndex {}",
+              req.consensusGroupId,
+              req);
+        }
+        searchIndex = batch.getSearchIndex();
       }
       TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
       logger.debug(