You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/07/04 03:02:51 UTC

[iotdb] branch master updated: [IOTDB-3702] Use thread pool to recover broken tsfiles without wal (#6533)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e2679d16c8 [IOTDB-3702] Use thread pool to recover broken tsfiles without wal (#6533)
e2679d16c8 is described below

commit e2679d16c8ceede4468c669f25128ded5a039c46
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Mon Jul 4 11:02:46 2022 +0800

    [IOTDB-3702] Use thread pool to recover broken tsfiles without wal (#6533)
    
    [IOTDB-3702] Use thread pool to recover broken tsfiles without wal (#6533)
---
 .../iotdb/db/wal/recover/WALRecoverManager.java    | 63 ++++++++++++++++------
 1 file changed, 48 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
index 267b625dc0..e4a43dae39 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
 import org.apache.iotdb.db.wal.exception.WALRecoverException;
 import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
 import org.apache.iotdb.db.wal.utils.listener.WALRecoverListener;
@@ -37,9 +38,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 /** First set allVsgScannedLatch, then call recover method. */
 public class WALRecoverManager {
@@ -103,21 +107,7 @@ public class WALRecoverManager {
         }
       }
       // deal with remaining TsFiles which don't have wal
-      for (UnsealedTsFileRecoverPerformer recoverPerformer :
-          absolutePath2RecoverPerformer.values()) {
-        try {
-          recoverPerformer.startRecovery();
-          // skip redo logs because it doesn't belong to any wal node
-          recoverPerformer.endRecovery();
-          recoverPerformer.getRecoverListener().succeed();
-        } catch (DataRegionException | IOException e) {
-          logger.error(
-              "Fail to recover unsealed TsFile {}, skip it.",
-              recoverPerformer.getTsFileAbsolutePath(),
-              e);
-          recoverPerformer.getRecoverListener().fail(e);
-        }
-      }
+      asyncRecoverLeftTsFiles();
     } catch (Exception e) {
       for (UnsealedTsFileRecoverPerformer recoverPerformer :
           absolutePath2RecoverPerformer.values()) {
@@ -139,6 +129,49 @@ public class WALRecoverManager {
     logger.info("Successfully recover all wal nodes.");
   }
 
+  private void asyncRecoverLeftTsFiles() {
+    if (absolutePath2RecoverPerformer.isEmpty()) {
+      return;
+    }
+
+    List<Future<Void>> futures = new ArrayList<>();
+    ExecutorService recoverTsFilesThreadPool =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            Runtime.getRuntime().availableProcessors(), "TsFile-Recover");
+    // async recover
+    for (UnsealedTsFileRecoverPerformer recoverPerformer : absolutePath2RecoverPerformer.values()) {
+      Callable<Void> recoverTsFileTask =
+          () -> {
+            try {
+              recoverPerformer.startRecovery();
+              // skip redo logs because it doesn't belong to any wal node
+              recoverPerformer.endRecovery();
+              recoverPerformer.getRecoverListener().succeed();
+            } catch (DataRegionException | IOException | WALRecoverException e) {
+              logger.error(
+                  "Fail to recover unsealed TsFile {}, skip it.",
+                  recoverPerformer.getTsFileAbsolutePath(),
+                  e);
+              recoverPerformer.getRecoverListener().fail(e);
+            }
+            return null;
+          };
+      futures.add(recoverTsFilesThreadPool.submit(recoverTsFileTask));
+    }
+    // wait until all tasks done
+    for (Future<Void> future : futures) {
+      try {
+        future.get();
+      } catch (ExecutionException e) {
+        throw new StorageEngineFailureException("StorageEngine failed to recover.", e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new StorageEngineFailureException("StorageEngine failed to recover.", e);
+      }
+    }
+    recoverTsFilesThreadPool.shutdown();
+  }
+
   public WALRecoverListener addRecoverPerformer(UnsealedTsFileRecoverPerformer recoverPerformer) {
     if (hasStarted) {
       logger.error("Cannot recover tsfile from wal because wal recovery has already started");