You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/28 08:34:49 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1][IOTDB-5835] Fix wal accumulation caused by datanode restart
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new f909d67635 [To rel/1.1][IOTDB-5835] Fix wal accumulation caused by datanode restart
f909d67635 is described below
commit f909d676354a88aa5fa8b41da828536f3f6e4ed0
Author: Potato <ta...@apache.org>
AuthorDate: Fri Apr 28 16:34:40 2023 +0800
[To rel/1.1][IOTDB-5835] Fix wal accumulation caused by datanode restart
---
.../apache/iotdb/consensus/iot/IoTConsensusServerImpl.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 201322fa48..2d303f5a91 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -138,9 +138,12 @@ public class IoTConsensusServerImpl {
this.config = config;
this.logDispatcher = new LogDispatcher(this, clientManager);
reader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan());
- long currentSearchIndex = reader.getCurrentSearchIndex();
+ this.searchIndex = new AtomicLong(reader.getCurrentSearchIndex());
+ // Since the underlying wal does not persist safelyDeletedSearchIndex, IoTConsensus needs to
+ // update wal with its syncIndex recovered from the consensus layer when initializing.
+ // This prevents wal from being piled up if the safelyDeletedSearchIndex is not updated after
+ // the restart and Leader migration occurs
checkAndUpdateSafeDeletedSearchIndex();
- this.searchIndex = new AtomicLong(currentSearchIndex);
this.consensusGroupId = thisNode.getGroupId().toString();
this.metrics = new IoTConsensusServerMetrics(this);
}
@@ -817,12 +820,14 @@ public class IoTConsensusServerImpl {
}
/**
- * only one configuration means single replica, then we can set safelyDeletedSearchIndex to
- * Long.MAX_VALUE.
+ * If there is only one replica, set it to Long.MAX_VALUE.、 If there are multiple replicas, get
+ * the latest SafelyDeletedSearchIndex again. This enables wal to be deleted in a timely manner.
*/
public void checkAndUpdateSafeDeletedSearchIndex() {
if (configuration.size() == 1) {
reader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
+ } else {
+ reader.setSafelyDeletedSearchIndex(getCurrentSafelyDeletedSearchIndex());
}
}