You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/31 15:28:54 UTC
[iotdb] branch master updated: [IOTDB-3350] Recover wal search index when recovering from multi-leader (#6094)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 45e46bf105 [IOTDB-3350] Recover wal search index when recovering from multi-leader (#6094)
45e46bf105 is described below
commit 45e46bf105c62673943514f6d1df1928f45abce0
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Tue May 31 23:28:49 2022 +0800
[IOTDB-3350] Recover wal search index when recovering from multi-leader (#6094)
---
.../iotdb/db/engine/storagegroup/DataRegion.java | 1 +
.../java/org/apache/iotdb/db/wal/WALManager.java | 6 ++-
.../allocation/AbstractNodeAllocationStrategy.java | 5 ++-
.../db/wal/allocation/FirstCreateStrategy.java | 6 ++-
.../iotdb/db/wal/buffer/AbstractWALBuffer.java | 6 ++-
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 7 ++--
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 7 ++--
.../iotdb/db/wal/recover/WALNodeRecoverTask.java | 45 ++++++++++++++++++----
.../db/wal/allocation/FirstCreateStrategyTest.java | 3 +-
9 files changed, 64 insertions(+), 22 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 642c68e760..8f5610194e 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -3634,6 +3634,7 @@ public class DataRegion {
return idTable;
}
+ /** This method could only be used in multi-leader consensus */
public IWALNode getWALNode() {
if (!config
.getDataRegionConsensusProtocolClass()
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
index 35c8582bef..8e028ad09a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
@@ -75,7 +75,9 @@ public class WALManager implements IService {
return walNodesManager.applyForWALNode(applicantUniqueId);
}
- public void registerWALNode(String applicantUniqueId, String logDirectory, int startFileVersion) {
+ public void registerWALNode(
+ String applicantUniqueId, String logDirectory, int startFileVersion, long startSearchIndex) {
+ String s = config.getDataRegionConsensusProtocolClass();
if (config.getWalMode() == WALMode.DISABLE
|| !config
.getDataRegionConsensusProtocolClass()
@@ -84,7 +86,7 @@ public class WALManager implements IService {
}
((FirstCreateStrategy) walNodesManager)
- .registerWALNode(applicantUniqueId, logDirectory, startFileVersion);
+ .registerWALNode(applicantUniqueId, logDirectory, startFileVersion, startSearchIndex);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
index f3e0851006..3b892e847c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
@@ -77,9 +77,10 @@ public abstract class AbstractNodeAllocationStrategy implements NodeAllocationSt
}
}
- protected IWALNode createWALNode(String identifier, String folder, int startFileVersion) {
+ protected IWALNode createWALNode(
+ String identifier, String folder, int startFileVersion, long startSearchIndex) {
try {
- return new WALNode(identifier, folder, startFileVersion);
+ return new WALNode(identifier, folder, startFileVersion, startSearchIndex);
} catch (FileNotFoundException e) {
logger.error("Fail to create wal node", e);
return WALFakeNode.getFailureInstance(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
index 2fda7da63e..a008ee3a46 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
@@ -60,14 +60,16 @@ public class FirstCreateStrategy extends AbstractNodeAllocationStrategy {
}
}
- public void registerWALNode(String applicantUniqueId, String logDirectory, int startFileVersion) {
+ public void registerWALNode(
+ String applicantUniqueId, String logDirectory, int startFileVersion, long startSearchIndex) {
nodesLock.lock();
try {
if (identifier2Nodes.containsKey(applicantUniqueId)) {
return;
}
- IWALNode walNode = createWALNode(applicantUniqueId, logDirectory, startFileVersion);
+ IWALNode walNode =
+ createWALNode(applicantUniqueId, logDirectory, startFileVersion, startSearchIndex);
if (walNode instanceof WALNode) {
// avoid deletion
((WALNode) walNode).setSafelyDeletedSearchIndex(Long.MIN_VALUE);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 2510aa4a75..8cc27f9dfb 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -41,11 +41,12 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
/** current wal file version id */
protected final AtomicInteger currentWALFileVersion = new AtomicInteger();
/** current search index */
- protected volatile long currentSearchIndex = 0;
+ protected volatile long currentSearchIndex;
/** current wal file log writer */
protected volatile ILogWriter currentWALFileWriter;
- public AbstractWALBuffer(String identifier, String logDirectory, int startFileVersion)
+ public AbstractWALBuffer(
+ String identifier, String logDirectory, int startFileVersion, long startSearchIndex)
throws FileNotFoundException {
this.identifier = identifier;
this.logDirectory = logDirectory;
@@ -53,6 +54,7 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
if (!logDirFile.exists() && logDirFile.mkdirs()) {
logger.info("create folder {} for wal buffer-{}.", logDirectory, identifier);
}
+ currentSearchIndex = startSearchIndex;
currentWALFileVersion.set(startFileVersion);
currentWALFileWriter =
new WALWriter(
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 9d7a4e2863..cc8cc4d811 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -78,12 +78,13 @@ public class WALBuffer extends AbstractWALBuffer {
private final ExecutorService syncBufferThread;
public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
- this(identifier, logDirectory, 0);
+ this(identifier, logDirectory, 0, 0L);
}
- public WALBuffer(String identifier, String logDirectory, int startFileVersion)
+ public WALBuffer(
+ String identifier, String logDirectory, int startFileVersion, long startSearchIndex)
throws FileNotFoundException {
- super(identifier, logDirectory, startFileVersion);
+ super(identifier, logDirectory, startFileVersion, startSearchIndex);
allocateBuffers();
serializeThread =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 0bb1972ba2..5b9ce10eb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -109,17 +109,18 @@ public class WALNode implements IWALNode {
private volatile long safelyDeletedSearchIndex = DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
- this(identifier, logDirectory, 0);
+ this(identifier, logDirectory, 0, 0L);
}
- public WALNode(String identifier, String logDirectory, int startFileVersion)
+ public WALNode(
+ String identifier, String logDirectory, int startFileVersion, long startSearchIndex)
throws FileNotFoundException {
this.identifier = identifier;
this.logDirectory = SystemFileFactory.INSTANCE.getFile(logDirectory);
if (!this.logDirectory.exists() && this.logDirectory.mkdirs()) {
logger.info("create folder {} for wal node-{}.", logDirectory, identifier);
}
- this.buffer = new WALBuffer(identifier, logDirectory, startFileVersion);
+ this.buffer = new WALBuffer(identifier, logDirectory, startFileVersion, startSearchIndex);
this.checkpointManager = new CheckpointManager(identifier, logDirectory);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
index 50a9c6e3a9..790b661a83 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
import org.apache.iotdb.db.wal.io.WALReader;
import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -52,8 +54,6 @@ public class WALNodeRecoverTask implements Runnable {
private final CountDownLatch allNodesRecoveredLatch;
/** version id of first valid .wal file */
private int firstValidVersionId = Integer.MAX_VALUE;
- /** version id of last .wal file */
- private int lastVersionId = -1;
private Map<Integer, MemTableInfo> memTableId2Info;
private Map<Integer, UnsealedTsFileRecoverPerformer> memTableId2RecoverPerformer;
@@ -95,19 +95,54 @@ public class WALNodeRecoverTask implements Runnable {
"Successfully recover WAL node in the directory {}, so delete these wal files.",
logDirectory);
} else {
+ // delete checkpoint info to avoid repeated recover
File[] checkpointFiles = CheckpointFileUtils.listAllCheckpointFiles(logDirectory);
for (File checkpointFile : checkpointFiles) {
checkpointFile.delete();
}
+ // recover version id and search index
+ long[] indexInfo = recoverLastSearchIndex();
+ int lastVersionId = (int) indexInfo[0];
+ long lastSearchIndex = indexInfo[1];
WALManager.getInstance()
.registerWALNode(
- logDirectory.getName(), logDirectory.getAbsolutePath(), lastVersionId + 1);
+ logDirectory.getName(),
+ logDirectory.getAbsolutePath(),
+ lastVersionId + 1,
+ lastSearchIndex);
logger.info(
"Successfully recover WAL node in the directory {}, add this node to WALManger.",
logDirectory);
}
}
+ private long[] recoverLastSearchIndex() {
+ File[] walFiles = WALFileUtils.listAllWALFiles(logDirectory);
+ if (walFiles == null || walFiles.length == 0) {
+ return new long[] {0L, 0L};
+ }
+ // get last search index from last wal file
+ WALFileUtils.ascSortByVersionId(walFiles);
+ File lastWALFile = walFiles[walFiles.length - 1];
+ int lastVersionId = WALFileUtils.parseVersionId(lastWALFile.getName());
+ long lastSearchIndex = WALFileUtils.parseStartSearchIndex(lastWALFile.getName());
+ try (WALReader walReader = new WALReader(lastWALFile)) {
+ while (walReader.hasNext()) {
+ WALEntry walEntry = walReader.next();
+ if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+ || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+ InsertNode insertNode = (InsertNode) walEntry.getValue();
+ if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
+ lastSearchIndex = Math.max(lastSearchIndex, insertNode.getSearchIndex());
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Fail to read wal logs from {}, skip them", lastWALFile, e);
+ }
+ return new long[] {lastVersionId, lastSearchIndex};
+ }
+
private void recoverInfoFromCheckpoints() {
// parse memTables information
CheckpointRecoverUtils.CheckpointInfo info =
@@ -159,10 +194,6 @@ public class WALNodeRecoverTask implements Runnable {
}
// asc sort by version id
WALFileUtils.ascSortByVersionId(walFiles);
- // update last version id
- if (walFiles.length != 0) {
- lastVersionId = WALFileUtils.parseVersionId(walFiles[walFiles.length - 1].getName());
- }
// read .wal files and redo logs
for (File walFile : walFiles) {
try (WALReader walReader = new WALReader(walFile)) {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java b/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
index 484b6f18fd..e2141aa50e 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
@@ -108,7 +108,8 @@ public class FirstCreateStrategyTest {
try {
for (int i = 0; i < 12; i++) {
String identifier = String.valueOf(i % 6);
- roundRobinStrategy.registerWALNode(identifier, walDirs[0] + File.separator + identifier, 0);
+ roundRobinStrategy.registerWALNode(
+ identifier, walDirs[0] + File.separator + identifier, 0, 0L);
IWALNode walNode = roundRobinStrategy.applyForWALNode(identifier);
if (i < 6) {
walNodes[i] = walNode;