You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/15 07:27:44 UTC

[iotdb] 01/01: Fix storage engine v2 cannot recover

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_storageenginev2_recover
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6719f0429e4814895e6f0c7de323bc4a6298006e
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Apr 15 15:23:18 2022 +0800

    Fix storage engine v2 cannot recover
---
 .../apache/iotdb/db/engine/StorageEngineV2.java    | 100 +++++++++++++--------
 .../iotdb/db/wal/recover/WALRecoverManager.java    |   3 +-
 .../wal/utils/listener/AbstractResultListener.java |   2 +-
 3 files changed, 63 insertions(+), 42 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 47f8497f5c..34bcf5e0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -64,8 +64,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.ConcurrentModificationException;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -75,6 +77,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class StorageEngineV2 implements IService {
   private static final Logger logger = LoggerFactory.getLogger(StorageEngineV2.class);
@@ -103,6 +106,9 @@ public class StorageEngineV2 implements IService {
   private final ConcurrentHashMap<ConsensusGroupId, DataRegion> dataRegionMap =
       new ConcurrentHashMap<>();
 
+  /** number of ready virtual storage group processors */
+  private AtomicInteger readyDataRegionNum;
+
   private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
 
   private ScheduledExecutorService ttlCheckThread;
@@ -199,16 +205,19 @@ public class StorageEngineV2 implements IService {
   public void recover() {
     setAllSgReady(false);
     recoveryThreadPool =
-        IoTDBThreadPoolFactory.newFixedThreadPool(
-            Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
-    try {
-      getLocalDataRegion();
-    } catch (Exception e) {
-      throw new StorageEngineFailureException("StorageEngine failed to recover.", e);
-    }
+        IoTDBThreadPoolFactory.newCachedThreadPool(
+            ThreadName.DATA_REGION_RECOVER_SERVICE.getName());
+
     List<Future<Void>> futures = new LinkedList<>();
     asyncRecover(recoveryThreadPool, futures);
 
+    // wait until wal is recovered
+    try {
+      WALRecoverManager.getInstance().recover();
+    } catch (org.apache.iotdb.db.wal.exception.WALException e) {
+      logger.error("Fail to recover wal.", e);
+    }
+
     // operations after all virtual storage groups are recovered
     Thread recoverEndTrigger =
         new Thread(
@@ -229,44 +238,57 @@ public class StorageEngineV2 implements IService {
     recoverEndTrigger.start();
   }
 
-  private void getLocalDataRegion() throws MetadataException, DataRegionException {
-    File system = SystemFileFactory.INSTANCE.getFile(systemDir);
-    File[] sgDirs = system.listFiles();
-    int sgCount = 0;
-    for (File sgDir : sgDirs) {
-      if (!sgDir.isDirectory()) {
-        sgCount++;
-      }
-    }
+  private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
+
+    Map<String, List<String>> localDataRegionInfo = getLocalDataRegionInfo();
+    readyDataRegionNum = new AtomicInteger(0);
     // init wal recover manager
     WALRecoverManager.getInstance()
-        .setAllDataRegionScannedLatch(new CountDownLatch(sgCount * config.getDataRegionNum()));
+        .setAllDataRegionScannedLatch(
+            new CountDownLatch(localDataRegionInfo.size() * config.getDataRegionNum()));
+    for (Map.Entry<String, List<String>> entry : localDataRegionInfo.entrySet()) {
+      String sgName = entry.getKey();
+      for (String dataRegionId : entry.getValue()) {
+        Callable<Void> recoverDataRegionTask =
+            () -> {
+              DataRegion dataRegion = null;
+              try {
+                dataRegion =
+                    StorageEngineV2.getInstance()
+                        .buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
+              } catch (DataRegionException e) {
+                logger.error("Failed to recover data region {}[{}]", sgName, dataRegionId, e);
+              }
+              dataRegionMap.put(new DataRegionId(Integer.parseInt(dataRegionId)), dataRegion);
+              dataRegion.setReady(true);
+              logger.info(
+                  "Data regions have been recovered {}", readyDataRegionNum.incrementAndGet());
+              return null;
+            };
+        futures.add(pool.submit(recoverDataRegionTask));
+      }
+    }
+  }
+
+  private Map<String, List<String>> getLocalDataRegionInfo() {
+    File system = SystemFileFactory.INSTANCE.getFile(systemDir);
+    File[] sgDirs = system.listFiles();
+    Map<String, List<String>> localDataRegionInfo = new HashMap<>();
     for (File sgDir : sgDirs) {
       if (!sgDir.isDirectory()) {
         continue;
       }
-      String sg = sgDir.getName();
-      long ttl = Long.MAX_VALUE;
+      String sgName = sgDir.getName();
+      List<String> dataRegionIdList = new ArrayList<>();
       for (File dataRegionDir : sgDir.listFiles()) {
         if (!dataRegionDir.isDirectory()) {
           continue;
         }
-        ConsensusGroupId dataRegionId = new DataRegionId(Integer.parseInt(dataRegionDir.getName()));
-        DataRegion dataRegion = buildNewDataRegion(sg, dataRegionDir.getName(), ttl);
-        dataRegionMap.putIfAbsent(dataRegionId, dataRegion);
+        dataRegionIdList.add(dataRegionDir.getName());
       }
+      localDataRegionInfo.put(sgName, dataRegionIdList);
     }
-  }
-
-  private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
-    for (DataRegion processor : dataRegionMap.values()) {
-      Callable<Void> recoverVsgTask =
-          () -> {
-            processor.setReady(true);
-            return null;
-          };
-      futures.add(pool.submit(recoverVsgTask));
-    }
+    return localDataRegionInfo;
   }
 
   @Override
@@ -437,21 +459,21 @@ public class StorageEngineV2 implements IService {
    */
   public DataRegion buildNewDataRegion(
       String logicalStorageGroupName, String dataRegionId, long ttl) throws DataRegionException {
-    DataRegion processor;
+    DataRegion dataRegion;
     logger.info(
         "construct a data region instance, the storage group is {}, Thread is {}",
         logicalStorageGroupName,
         Thread.currentThread().getId());
-    processor =
+    dataRegion =
         new DataRegion(
             systemDir + File.separator + logicalStorageGroupName,
             dataRegionId,
             fileFlushPolicy,
             logicalStorageGroupName);
-    processor.setDataTTL(ttl);
-    processor.setCustomFlushListeners(customFlushListeners);
-    processor.setCustomCloseFileListeners(customCloseFileListeners);
-    return processor;
+    dataRegion.setDataTTL(ttl);
+    dataRegion.setCustomFlushListeners(customFlushListeners);
+    dataRegion.setCustomCloseFileListeners(customCloseFileListeners);
+    return dataRegion;
   }
 
   /** This function is just for unit test. */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
index c22a702a1c..c87ff5ad98 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.exception.WALRecoverException;
 import org.apache.iotdb.db.wal.node.WALNode;
 import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -45,7 +44,7 @@ import java.util.concurrent.ExecutorService;
 
 /** First set allVsgScannedLatch, then call recover method. */
 public class WALRecoverManager {
-  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final Logger logger = LoggerFactory.getLogger(WALRecoverManager.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   /** start recovery after all virtual storage groups have submitted unsealed zero-level TsFiles */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
index beb54592b7..349e464007 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
@@ -66,8 +66,8 @@ public abstract class AbstractResultListener {
         try {
           this.wait();
         } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
           logger.warn("Interrupted when waiting for result.", e);
+          Thread.currentThread().interrupt();
         }
       }
     }