You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/10/21 11:47:41 UTC
[iotdb] 02/02: add more debug log for log dispatch
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_metrics_add_log
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e01c09d785c5e0ed3dfaba9ddca1854622d3f339
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Oct 21 19:47:21 2022 +0800
add more debug log for log dispatch
---
.../multileader/logdispatcher/LogDispatcher.java | 6 ++---
.../service/MultiLeaderRPCServiceProcessor.java | 7 +++++-
.../apache/iotdb/db/client/ConfigNodeClient.java | 1 +
.../statemachine/DataRegionStateMachine.java | 29 +++++++++++++++++++---
4 files changed, 35 insertions(+), 8 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 307fc141f7..67d6526077 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -425,10 +425,10 @@ public class LogDispatcher {
new TSyncLogReq(
selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
logger.debug(
- "Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
+ "[MultiLeader][DataRegion-{}] start={} Send Batch to {}",
+ peer.getGroupId().getId(),
batch.getStartIndex(),
- batch.getEndIndex(),
- peer.getGroupId().convertToTConsensusGroupId());
+ peer.getEndpoint());
client.syncLog(req, handler);
} catch (IOException | TException e) {
logger.error("Can not sync logs to peer {} because", peer, e);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 3f045e5e66..8323b7c635 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -69,6 +69,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
@Override
public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler) {
try {
+ logger.debug("[MultiLeader][DataRegion-{}], receive TSyncLogReq", req.consensusGroupId.id);
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
MultiLeaderServerImpl impl = consensus.getImpl(groupId);
@@ -125,9 +126,13 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
currentSearchIndex, consensusRequests));
}
}
+ logger.debug(
+ "[MultiLeader][DataRegion-{}] start={}, construct BatchIndexedConsensusRequest.",
+ req.consensusGroupId,
+ requestsInThisBatch.getStartSyncIndex());
TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
logger.debug(
- "execute TSyncLogReq for {} with result {}", req.consensusGroupId, writeStatus.subStatus);
+ "[MultiLeader][DataRegion-{}] execute complete TSyncLogReq", req.consensusGroupId);
resultHandler.onComplete(new TSyncLogRes(writeStatus.subStatus));
} catch (Exception e) {
resultHandler.onError(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index df0e0b65e8..4a7b524d9d 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -293,6 +293,7 @@ public class ConfigNodeClient
}
configNodes = newConfigNodes;
} catch (TException e) {
+ logger.error("error when registering DataNode to {} ", configLeader, e);
configLeader = null;
}
reconnect();
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 0d90993355..50925f36ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -158,7 +158,15 @@ public class DataRegionStateMachine extends BaseStateMachine {
* deserialization of PlanNode to be concurrent
*/
private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
+ logger.debug(
+ "[MultiLeader][DataRegion-{}] start={}, get enqueue lock.",
+ region.getDataRegionId(),
+ insertNodeWrapper.getStartSyncIndex());
queueLock.lock();
+ logger.debug(
+ "[MultiLeader][DataRegion-{}] start={}, enqueue lock got.",
+ region.getDataRegionId(),
+ insertNodeWrapper.getStartSyncIndex());
try {
requestCache.add(insertNodeWrapper);
// If the peek is not hold by current thread, it should notify the corresponding thread to
@@ -214,16 +222,19 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
}
logger.debug(
- "source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
- sourcePeerId,
+ "[MultiLeader][DataRegion-{}] start={}, source = {}, get from queue, queue size {}",
region.getDataRegionId(),
- requestCache.size(),
insertNodeWrapper.getStartSyncIndex(),
- insertNodeWrapper.getEndSyncIndex());
+ sourcePeerId,
+ requestCache.size());
List<TSStatus> subStatus = new LinkedList<>();
for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
subStatus.add(write(planNode));
}
+ logger.debug(
+ "[MultiLeader][DataRegion-{}] start={}, write done.",
+ region.getDataRegionId(),
+ insertNodeWrapper.getStartSyncIndex());
queueSortCondition.signalAll();
return new TSStatus().setSubStatus(subStatus);
} finally {
@@ -263,6 +274,12 @@ public class DataRegionStateMachine extends BaseStateMachine {
public List<PlanNode> getInsertNodes() {
return insertNodes;
}
+
+ public String toString() {
+ return String.format(
+ "InsertNodeWrapper[startSyncIndex:%d, endSyncIndex:%d, size:%d]",
+ startSyncIndex, endSyncIndex, insertNodes.size());
+ }
}
private InsertNodeWrapper deserializeAndWrap(BatchIndexedConsensusRequest batchRequest) {
@@ -321,6 +338,10 @@ public class DataRegionStateMachine extends BaseStateMachine {
IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request;
planNode = grabInsertNode(indexedRequest);
} else if (request instanceof BatchIndexedConsensusRequest) {
+ logger.debug(
+ "DataRegion {} prepare to write BatchIndexedConsensusRequest. start = {}",
+ region.getDataRegionId(),
+ ((BatchIndexedConsensusRequest) request).getStartSyncIndex());
InsertNodeWrapper insertNodeWrapper =
deserializeAndWrap((BatchIndexedConsensusRequest) request);
String sourcePeerId = ((BatchIndexedConsensusRequest) request).getSourcePeerId();