You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/10/21 11:47:41 UTC

[iotdb] 02/02: add more debug log for log dispatch

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_metrics_add_log
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e01c09d785c5e0ed3dfaba9ddca1854622d3f339
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Oct 21 19:47:21 2022 +0800

    add more debug log for log dispatch
---
 .../multileader/logdispatcher/LogDispatcher.java   |  6 ++---
 .../service/MultiLeaderRPCServiceProcessor.java    |  7 +++++-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  1 +
 .../statemachine/DataRegionStateMachine.java       | 29 +++++++++++++++++++---
 4 files changed, 35 insertions(+), 8 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 307fc141f7..67d6526077 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -425,10 +425,10 @@ public class LogDispatcher {
             new TSyncLogReq(
                 selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
         logger.debug(
-            "Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
+            "[MultiLeader][DataRegion-{}] start={} Send Batch to {}",
+            peer.getGroupId().getId(),
             batch.getStartIndex(),
-            batch.getEndIndex(),
-            peer.getGroupId().convertToTConsensusGroupId());
+            peer.getEndpoint());
         client.syncLog(req, handler);
       } catch (IOException | TException e) {
         logger.error("Can not sync logs to peer {} because", peer, e);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 3f045e5e66..8323b7c635 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -69,6 +69,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
   @Override
   public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler) {
     try {
+      logger.debug("[MultiLeader][DataRegion-{}], receive TSyncLogReq", req.consensusGroupId.id);
       ConsensusGroupId groupId =
           ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
       MultiLeaderServerImpl impl = consensus.getImpl(groupId);
@@ -125,9 +126,13 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
                   currentSearchIndex, consensusRequests));
         }
       }
+      logger.debug(
+          "[MultiLeader][DataRegion-{}] start={}, construct BatchIndexedConsensusRequest.",
+          req.consensusGroupId,
+          requestsInThisBatch.getStartSyncIndex());
       TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
       logger.debug(
-          "execute TSyncLogReq for {} with result {}", req.consensusGroupId, writeStatus.subStatus);
+          "[MultiLeader][DataRegion-{}] execute complete TSyncLogReq", req.consensusGroupId);
       resultHandler.onComplete(new TSyncLogRes(writeStatus.subStatus));
     } catch (Exception e) {
       resultHandler.onError(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index df0e0b65e8..4a7b524d9d 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -293,6 +293,7 @@ public class ConfigNodeClient
         }
         configNodes = newConfigNodes;
       } catch (TException e) {
+        logger.error("error when registering DataNode to {} ", configLeader, e);
         configLeader = null;
       }
       reconnect();
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 0d90993355..50925f36ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -158,7 +158,15 @@ public class DataRegionStateMachine extends BaseStateMachine {
      * deserialization of PlanNode to be concurrent
      */
     private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
+      logger.debug(
+          "[MultiLeader][DataRegion-{}] start={}, get enqueue lock.",
+          region.getDataRegionId(),
+          insertNodeWrapper.getStartSyncIndex());
       queueLock.lock();
+      logger.debug(
+          "[MultiLeader][DataRegion-{}] start={}, enqueue lock got.",
+          region.getDataRegionId(),
+          insertNodeWrapper.getStartSyncIndex());
       try {
         requestCache.add(insertNodeWrapper);
         // If the peek is not hold by current thread, it should notify the corresponding thread to
@@ -214,16 +222,19 @@ public class DataRegionStateMachine extends BaseStateMachine {
           }
         }
         logger.debug(
-            "source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
-            sourcePeerId,
+            "[MultiLeader][DataRegion-{}] start={}, source = {}, get from queue, queue size {}",
             region.getDataRegionId(),
-            requestCache.size(),
             insertNodeWrapper.getStartSyncIndex(),
-            insertNodeWrapper.getEndSyncIndex());
+            sourcePeerId,
+            requestCache.size());
         List<TSStatus> subStatus = new LinkedList<>();
         for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
           subStatus.add(write(planNode));
         }
+        logger.debug(
+            "[MultiLeader][DataRegion-{}] start={}, write done.",
+            region.getDataRegionId(),
+            insertNodeWrapper.getStartSyncIndex());
         queueSortCondition.signalAll();
         return new TSStatus().setSubStatus(subStatus);
       } finally {
@@ -263,6 +274,12 @@ public class DataRegionStateMachine extends BaseStateMachine {
     public List<PlanNode> getInsertNodes() {
       return insertNodes;
     }
+
+    public String toString() {
+      return String.format(
+          "InsertNodeWrapper[startSyncIndex:%d, endSyncIndex:%d, size:%d]",
+          startSyncIndex, endSyncIndex, insertNodes.size());
+    }
   }
 
   private InsertNodeWrapper deserializeAndWrap(BatchIndexedConsensusRequest batchRequest) {
@@ -321,6 +338,10 @@ public class DataRegionStateMachine extends BaseStateMachine {
         IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request;
         planNode = grabInsertNode(indexedRequest);
       } else if (request instanceof BatchIndexedConsensusRequest) {
+        logger.debug(
+            "DataRegion {} prepare to write BatchIndexedConsensusRequest. start = {}",
+            region.getDataRegionId(),
+            ((BatchIndexedConsensusRequest) request).getStartSyncIndex());
         InsertNodeWrapper insertNodeWrapper =
             deserializeAndWrap((BatchIndexedConsensusRequest) request);
         String sourcePeerId = ((BatchIndexedConsensusRequest) request).getSourcePeerId();