You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/31 15:28:54 UTC

[iotdb] branch master updated: [IOTDB-3350] Recover wal search index when recovering from multi-leader (#6094)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 45e46bf105 [IOTDB-3350] Recover wal search index when recovering from multi-leader (#6094)
45e46bf105 is described below

commit 45e46bf105c62673943514f6d1df1928f45abce0
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Tue May 31 23:28:49 2022 +0800

    [IOTDB-3350] Recover wal search index when recovering from multi-leader (#6094)
---
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  1 +
 .../java/org/apache/iotdb/db/wal/WALManager.java   |  6 ++-
 .../allocation/AbstractNodeAllocationStrategy.java |  5 ++-
 .../db/wal/allocation/FirstCreateStrategy.java     |  6 ++-
 .../iotdb/db/wal/buffer/AbstractWALBuffer.java     |  6 ++-
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |  7 ++--
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |  7 ++--
 .../iotdb/db/wal/recover/WALNodeRecoverTask.java   | 45 ++++++++++++++++++----
 .../db/wal/allocation/FirstCreateStrategyTest.java |  3 +-
 9 files changed, 64 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 642c68e760..8f5610194e 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -3634,6 +3634,7 @@ public class DataRegion {
     return idTable;
   }
 
+  /** This method could only be used in multi-leader consensus */
   public IWALNode getWALNode() {
     if (!config
         .getDataRegionConsensusProtocolClass()
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
index 35c8582bef..8e028ad09a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
@@ -75,7 +75,9 @@ public class WALManager implements IService {
     return walNodesManager.applyForWALNode(applicantUniqueId);
   }
 
-  public void registerWALNode(String applicantUniqueId, String logDirectory, int startFileVersion) {
+  public void registerWALNode(
+      String applicantUniqueId, String logDirectory, int startFileVersion, long startSearchIndex) {
+    String s = config.getDataRegionConsensusProtocolClass();
     if (config.getWalMode() == WALMode.DISABLE
         || !config
             .getDataRegionConsensusProtocolClass()
@@ -84,7 +86,7 @@ public class WALManager implements IService {
     }
 
     ((FirstCreateStrategy) walNodesManager)
-        .registerWALNode(applicantUniqueId, logDirectory, startFileVersion);
+        .registerWALNode(applicantUniqueId, logDirectory, startFileVersion, startSearchIndex);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
index f3e0851006..3b892e847c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
@@ -77,9 +77,10 @@ public abstract class AbstractNodeAllocationStrategy implements NodeAllocationSt
     }
   }
 
-  protected IWALNode createWALNode(String identifier, String folder, int startFileVersion) {
+  protected IWALNode createWALNode(
+      String identifier, String folder, int startFileVersion, long startSearchIndex) {
     try {
-      return new WALNode(identifier, folder, startFileVersion);
+      return new WALNode(identifier, folder, startFileVersion, startSearchIndex);
     } catch (FileNotFoundException e) {
       logger.error("Fail to create wal node", e);
       return WALFakeNode.getFailureInstance(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
index 2fda7da63e..a008ee3a46 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
@@ -60,14 +60,16 @@ public class FirstCreateStrategy extends AbstractNodeAllocationStrategy {
     }
   }
 
-  public void registerWALNode(String applicantUniqueId, String logDirectory, int startFileVersion) {
+  public void registerWALNode(
+      String applicantUniqueId, String logDirectory, int startFileVersion, long startSearchIndex) {
     nodesLock.lock();
     try {
       if (identifier2Nodes.containsKey(applicantUniqueId)) {
         return;
       }
 
-      IWALNode walNode = createWALNode(applicantUniqueId, logDirectory, startFileVersion);
+      IWALNode walNode =
+          createWALNode(applicantUniqueId, logDirectory, startFileVersion, startSearchIndex);
       if (walNode instanceof WALNode) {
         // avoid deletion
         ((WALNode) walNode).setSafelyDeletedSearchIndex(Long.MIN_VALUE);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 2510aa4a75..8cc27f9dfb 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -41,11 +41,12 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
   /** current wal file version id */
   protected final AtomicInteger currentWALFileVersion = new AtomicInteger();
   /** current search index */
-  protected volatile long currentSearchIndex = 0;
+  protected volatile long currentSearchIndex;
   /** current wal file log writer */
   protected volatile ILogWriter currentWALFileWriter;
 
-  public AbstractWALBuffer(String identifier, String logDirectory, int startFileVersion)
+  public AbstractWALBuffer(
+      String identifier, String logDirectory, int startFileVersion, long startSearchIndex)
       throws FileNotFoundException {
     this.identifier = identifier;
     this.logDirectory = logDirectory;
@@ -53,6 +54,7 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
     if (!logDirFile.exists() && logDirFile.mkdirs()) {
       logger.info("create folder {} for wal buffer-{}.", logDirectory, identifier);
     }
+    currentSearchIndex = startSearchIndex;
     currentWALFileVersion.set(startFileVersion);
     currentWALFileWriter =
         new WALWriter(
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 9d7a4e2863..cc8cc4d811 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -78,12 +78,13 @@ public class WALBuffer extends AbstractWALBuffer {
   private final ExecutorService syncBufferThread;
 
   public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
-    this(identifier, logDirectory, 0);
+    this(identifier, logDirectory, 0, 0L);
   }
 
-  public WALBuffer(String identifier, String logDirectory, int startFileVersion)
+  public WALBuffer(
+      String identifier, String logDirectory, int startFileVersion, long startSearchIndex)
       throws FileNotFoundException {
-    super(identifier, logDirectory, startFileVersion);
+    super(identifier, logDirectory, startFileVersion, startSearchIndex);
     allocateBuffers();
     serializeThread =
         IoTDBThreadPoolFactory.newSingleThreadExecutor(
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 0bb1972ba2..5b9ce10eb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -109,17 +109,18 @@ public class WALNode implements IWALNode {
   private volatile long safelyDeletedSearchIndex = DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
 
   public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
-    this(identifier, logDirectory, 0);
+    this(identifier, logDirectory, 0, 0L);
   }
 
-  public WALNode(String identifier, String logDirectory, int startFileVersion)
+  public WALNode(
+      String identifier, String logDirectory, int startFileVersion, long startSearchIndex)
       throws FileNotFoundException {
     this.identifier = identifier;
     this.logDirectory = SystemFileFactory.INSTANCE.getFile(logDirectory);
     if (!this.logDirectory.exists() && this.logDirectory.mkdirs()) {
       logger.info("create folder {} for wal node-{}.", logDirectory, identifier);
     }
-    this.buffer = new WALBuffer(identifier, logDirectory, startFileVersion);
+    this.buffer = new WALBuffer(identifier, logDirectory, startFileVersion, startSearchIndex);
     this.checkpointManager = new CheckpointManager(identifier, logDirectory);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
index 50a9c6e3a9..790b661a83 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
 import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
 import org.apache.iotdb.db.wal.io.WALReader;
 import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -52,8 +54,6 @@ public class WALNodeRecoverTask implements Runnable {
   private final CountDownLatch allNodesRecoveredLatch;
   /** version id of first valid .wal file */
   private int firstValidVersionId = Integer.MAX_VALUE;
-  /** version id of last .wal file */
-  private int lastVersionId = -1;
 
   private Map<Integer, MemTableInfo> memTableId2Info;
   private Map<Integer, UnsealedTsFileRecoverPerformer> memTableId2RecoverPerformer;
@@ -95,19 +95,54 @@ public class WALNodeRecoverTask implements Runnable {
           "Successfully recover WAL node in the directory {}, so delete these wal files.",
           logDirectory);
     } else {
+      // delete checkpoint info to avoid repeated recover
       File[] checkpointFiles = CheckpointFileUtils.listAllCheckpointFiles(logDirectory);
       for (File checkpointFile : checkpointFiles) {
         checkpointFile.delete();
       }
+      // recover version id and search index
+      long[] indexInfo = recoverLastSearchIndex();
+      int lastVersionId = (int) indexInfo[0];
+      long lastSearchIndex = indexInfo[1];
       WALManager.getInstance()
           .registerWALNode(
-              logDirectory.getName(), logDirectory.getAbsolutePath(), lastVersionId + 1);
+              logDirectory.getName(),
+              logDirectory.getAbsolutePath(),
+              lastVersionId + 1,
+              lastSearchIndex);
       logger.info(
           "Successfully recover WAL node in the directory {}, add this node to WALManger.",
           logDirectory);
     }
   }
 
+  private long[] recoverLastSearchIndex() {
+    File[] walFiles = WALFileUtils.listAllWALFiles(logDirectory);
+    if (walFiles == null || walFiles.length == 0) {
+      return new long[] {0L, 0L};
+    }
+    // get last search index from last wal file
+    WALFileUtils.ascSortByVersionId(walFiles);
+    File lastWALFile = walFiles[walFiles.length - 1];
+    int lastVersionId = WALFileUtils.parseVersionId(lastWALFile.getName());
+    long lastSearchIndex = WALFileUtils.parseStartSearchIndex(lastWALFile.getName());
+    try (WALReader walReader = new WALReader(lastWALFile)) {
+      while (walReader.hasNext()) {
+        WALEntry walEntry = walReader.next();
+        if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+            || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+          InsertNode insertNode = (InsertNode) walEntry.getValue();
+          if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
+            lastSearchIndex = Math.max(lastSearchIndex, insertNode.getSearchIndex());
+          }
+        }
+      }
+    } catch (Exception e) {
+      logger.warn("Fail to read wal logs from {}, skip them", lastWALFile, e);
+    }
+    return new long[] {lastVersionId, lastSearchIndex};
+  }
+
   private void recoverInfoFromCheckpoints() {
     // parse memTables information
     CheckpointRecoverUtils.CheckpointInfo info =
@@ -159,10 +194,6 @@ public class WALNodeRecoverTask implements Runnable {
     }
     // asc sort by version id
     WALFileUtils.ascSortByVersionId(walFiles);
-    // update last version id
-    if (walFiles.length != 0) {
-      lastVersionId = WALFileUtils.parseVersionId(walFiles[walFiles.length - 1].getName());
-    }
     // read .wal files and redo logs
     for (File walFile : walFiles) {
       try (WALReader walReader = new WALReader(walFile)) {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java b/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
index 484b6f18fd..e2141aa50e 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
@@ -108,7 +108,8 @@ public class FirstCreateStrategyTest {
     try {
       for (int i = 0; i < 12; i++) {
         String identifier = String.valueOf(i % 6);
-        roundRobinStrategy.registerWALNode(identifier, walDirs[0] + File.separator + identifier, 0);
+        roundRobinStrategy.registerWALNode(
+            identifier, walDirs[0] + File.separator + identifier, 0, 0L);
         IWALNode walNode = roundRobinStrategy.applyForWALNode(identifier);
         if (i < 6) {
           walNodes[i] = walNode;