You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/15 07:27:43 UTC

[iotdb] branch fix_storageenginev2_recover created (now 6719f0429e)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch fix_storageenginev2_recover
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 6719f0429e Fix storage engine v2 cannot recover

This branch includes the following new commits:

     new 6719f0429e Fix storage engine v2 cannot recover

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Fix storage engine v2 cannot recover

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_storageenginev2_recover
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6719f0429e4814895e6f0c7de323bc4a6298006e
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Apr 15 15:23:18 2022 +0800

    Fix storage engine v2 cannot recover
---
 .../apache/iotdb/db/engine/StorageEngineV2.java    | 100 +++++++++++++--------
 .../iotdb/db/wal/recover/WALRecoverManager.java    |   3 +-
 .../wal/utils/listener/AbstractResultListener.java |   2 +-
 3 files changed, 63 insertions(+), 42 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 47f8497f5c..34bcf5e0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -64,8 +64,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.ConcurrentModificationException;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -75,6 +77,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class StorageEngineV2 implements IService {
   private static final Logger logger = LoggerFactory.getLogger(StorageEngineV2.class);
@@ -103,6 +106,9 @@ public class StorageEngineV2 implements IService {
   private final ConcurrentHashMap<ConsensusGroupId, DataRegion> dataRegionMap =
       new ConcurrentHashMap<>();
 
+  /** number of ready virtual storage group processors */
+  private AtomicInteger readyDataRegionNum;
+
   private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
 
   private ScheduledExecutorService ttlCheckThread;
@@ -199,16 +205,19 @@ public class StorageEngineV2 implements IService {
   public void recover() {
     setAllSgReady(false);
     recoveryThreadPool =
-        IoTDBThreadPoolFactory.newFixedThreadPool(
-            Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
-    try {
-      getLocalDataRegion();
-    } catch (Exception e) {
-      throw new StorageEngineFailureException("StorageEngine failed to recover.", e);
-    }
+        IoTDBThreadPoolFactory.newCachedThreadPool(
+            ThreadName.DATA_REGION_RECOVER_SERVICE.getName());
+
     List<Future<Void>> futures = new LinkedList<>();
     asyncRecover(recoveryThreadPool, futures);
 
+    // wait until wal is recovered
+    try {
+      WALRecoverManager.getInstance().recover();
+    } catch (org.apache.iotdb.db.wal.exception.WALException e) {
+      logger.error("Fail to recover wal.", e);
+    }
+
     // operations after all virtual storage groups are recovered
     Thread recoverEndTrigger =
         new Thread(
@@ -229,44 +238,57 @@ public class StorageEngineV2 implements IService {
     recoverEndTrigger.start();
   }
 
-  private void getLocalDataRegion() throws MetadataException, DataRegionException {
-    File system = SystemFileFactory.INSTANCE.getFile(systemDir);
-    File[] sgDirs = system.listFiles();
-    int sgCount = 0;
-    for (File sgDir : sgDirs) {
-      if (!sgDir.isDirectory()) {
-        sgCount++;
-      }
-    }
+  private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
+
+    Map<String, List<String>> localDataRegionInfo = getLocalDataRegionInfo();
+    readyDataRegionNum = new AtomicInteger(0);
     // init wal recover manager
     WALRecoverManager.getInstance()
-        .setAllDataRegionScannedLatch(new CountDownLatch(sgCount * config.getDataRegionNum()));
+        .setAllDataRegionScannedLatch(
+            new CountDownLatch(localDataRegionInfo.size() * config.getDataRegionNum()));
+    for (Map.Entry<String, List<String>> entry : localDataRegionInfo.entrySet()) {
+      String sgName = entry.getKey();
+      for (String dataRegionId : entry.getValue()) {
+        Callable<Void> recoverDataRegionTask =
+            () -> {
+              DataRegion dataRegion = null;
+              try {
+                dataRegion =
+                    StorageEngineV2.getInstance()
+                        .buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
+              } catch (DataRegionException e) {
+                logger.error("Failed to recover data region {}[{}]", sgName, dataRegionId, e);
+              }
+              dataRegionMap.put(new DataRegionId(Integer.parseInt(dataRegionId)), dataRegion);
+              dataRegion.setReady(true);
+              logger.info(
+                  "Data regions have been recovered {}", readyDataRegionNum.incrementAndGet());
+              return null;
+            };
+        futures.add(pool.submit(recoverDataRegionTask));
+      }
+    }
+  }
+
+  private Map<String, List<String>> getLocalDataRegionInfo() {
+    File system = SystemFileFactory.INSTANCE.getFile(systemDir);
+    File[] sgDirs = system.listFiles();
+    Map<String, List<String>> localDataRegionInfo = new HashMap<>();
     for (File sgDir : sgDirs) {
       if (!sgDir.isDirectory()) {
         continue;
       }
-      String sg = sgDir.getName();
-      long ttl = Long.MAX_VALUE;
+      String sgName = sgDir.getName();
+      List<String> dataRegionIdList = new ArrayList<>();
       for (File dataRegionDir : sgDir.listFiles()) {
         if (!dataRegionDir.isDirectory()) {
           continue;
         }
-        ConsensusGroupId dataRegionId = new DataRegionId(Integer.parseInt(dataRegionDir.getName()));
-        DataRegion dataRegion = buildNewDataRegion(sg, dataRegionDir.getName(), ttl);
-        dataRegionMap.putIfAbsent(dataRegionId, dataRegion);
+        dataRegionIdList.add(dataRegionDir.getName());
       }
+      localDataRegionInfo.put(sgName, dataRegionIdList);
     }
-  }
-
-  private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
-    for (DataRegion processor : dataRegionMap.values()) {
-      Callable<Void> recoverVsgTask =
-          () -> {
-            processor.setReady(true);
-            return null;
-          };
-      futures.add(pool.submit(recoverVsgTask));
-    }
+    return localDataRegionInfo;
   }
 
   @Override
@@ -437,21 +459,21 @@ public class StorageEngineV2 implements IService {
    */
   public DataRegion buildNewDataRegion(
       String logicalStorageGroupName, String dataRegionId, long ttl) throws DataRegionException {
-    DataRegion processor;
+    DataRegion dataRegion;
     logger.info(
         "construct a data region instance, the storage group is {}, Thread is {}",
         logicalStorageGroupName,
         Thread.currentThread().getId());
-    processor =
+    dataRegion =
         new DataRegion(
             systemDir + File.separator + logicalStorageGroupName,
             dataRegionId,
             fileFlushPolicy,
             logicalStorageGroupName);
-    processor.setDataTTL(ttl);
-    processor.setCustomFlushListeners(customFlushListeners);
-    processor.setCustomCloseFileListeners(customCloseFileListeners);
-    return processor;
+    dataRegion.setDataTTL(ttl);
+    dataRegion.setCustomFlushListeners(customFlushListeners);
+    dataRegion.setCustomCloseFileListeners(customCloseFileListeners);
+    return dataRegion;
   }
 
   /** This function is just for unit test. */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
index c22a702a1c..c87ff5ad98 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.exception.WALRecoverException;
 import org.apache.iotdb.db.wal.node.WALNode;
 import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -45,7 +44,7 @@ import java.util.concurrent.ExecutorService;
 
 /** First set allVsgScannedLatch, then call recover method. */
 public class WALRecoverManager {
-  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final Logger logger = LoggerFactory.getLogger(WALRecoverManager.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   /** start recovery after all virtual storage groups have submitted unsealed zero-level TsFiles */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
index beb54592b7..349e464007 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
@@ -66,8 +66,8 @@ public abstract class AbstractResultListener {
         try {
           this.wait();
         } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
           logger.warn("Interrupted when waiting for result.", e);
+          Thread.currentThread().interrupt();
         }
       }
     }