You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/07/04 03:02:51 UTC
[iotdb] branch master updated: [IOTDB-3702] Use thread pool to recover broken tsfiles without wal (#6533)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e2679d16c8 [IOTDB-3702] Use thread pool to recover broken tsfiles without wal (#6533)
e2679d16c8 is described below
commit e2679d16c8ceede4468c669f25128ded5a039c46
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Mon Jul 4 11:02:46 2022 +0800
[IOTDB-3702] Use thread pool to recover broken tsfiles without wal (#6533)
[IOTDB-3702] Use thread pool to recover broken tsfiles without wal (#6533)
---
.../iotdb/db/wal/recover/WALRecoverManager.java | 63 ++++++++++++++++------
1 file changed, 48 insertions(+), 15 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
index 267b625dc0..e4a43dae39 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.wal.exception.WALRecoverException;
import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
import org.apache.iotdb.db.wal.utils.listener.WALRecoverListener;
@@ -37,9 +38,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
/** First set allVsgScannedLatch, then call recover method. */
public class WALRecoverManager {
@@ -103,21 +107,7 @@ public class WALRecoverManager {
}
}
// deal with remaining TsFiles which don't have wal
- for (UnsealedTsFileRecoverPerformer recoverPerformer :
- absolutePath2RecoverPerformer.values()) {
- try {
- recoverPerformer.startRecovery();
- // skip redo logs because it doesn't belong to any wal node
- recoverPerformer.endRecovery();
- recoverPerformer.getRecoverListener().succeed();
- } catch (DataRegionException | IOException e) {
- logger.error(
- "Fail to recover unsealed TsFile {}, skip it.",
- recoverPerformer.getTsFileAbsolutePath(),
- e);
- recoverPerformer.getRecoverListener().fail(e);
- }
- }
+ asyncRecoverLeftTsFiles();
} catch (Exception e) {
for (UnsealedTsFileRecoverPerformer recoverPerformer :
absolutePath2RecoverPerformer.values()) {
@@ -139,6 +129,49 @@ public class WALRecoverManager {
logger.info("Successfully recover all wal nodes.");
}
+ private void asyncRecoverLeftTsFiles() {
+ if (absolutePath2RecoverPerformer.isEmpty()) {
+ return;
+ }
+
+ List<Future<Void>> futures = new ArrayList<>();
+ ExecutorService recoverTsFilesThreadPool =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors(), "TsFile-Recover");
+ // async recover
+ for (UnsealedTsFileRecoverPerformer recoverPerformer : absolutePath2RecoverPerformer.values()) {
+ Callable<Void> recoverTsFileTask =
+ () -> {
+ try {
+ recoverPerformer.startRecovery();
+ // skip redo logs because it doesn't belong to any wal node
+ recoverPerformer.endRecovery();
+ recoverPerformer.getRecoverListener().succeed();
+ } catch (DataRegionException | IOException | WALRecoverException e) {
+ logger.error(
+ "Fail to recover unsealed TsFile {}, skip it.",
+ recoverPerformer.getTsFileAbsolutePath(),
+ e);
+ recoverPerformer.getRecoverListener().fail(e);
+ }
+ return null;
+ };
+ futures.add(recoverTsFilesThreadPool.submit(recoverTsFileTask));
+ }
+ // wait until all tasks done
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ throw new StorageEngineFailureException("StorageEngine failed to recover.", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new StorageEngineFailureException("StorageEngine failed to recover.", e);
+ }
+ }
+ recoverTsFilesThreadPool.shutdown();
+ }
+
public WALRecoverListener addRecoverPerformer(UnsealedTsFileRecoverPerformer recoverPerformer) {
if (hasStarted) {
logger.error("Cannot recover tsfile from wal because wal recovery has already started");