You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/15 07:27:44 UTC
[iotdb] 01/01: Fix storage engine v2 cannot recover
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch fix_storageenginev2_recover
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6719f0429e4814895e6f0c7de323bc4a6298006e
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Apr 15 15:23:18 2022 +0800
Fix storage engine v2 cannot recover
---
.../apache/iotdb/db/engine/StorageEngineV2.java | 100 +++++++++++++--------
.../iotdb/db/wal/recover/WALRecoverManager.java | 3 +-
.../wal/utils/listener/AbstractResultListener.java | 2 +-
3 files changed, 63 insertions(+), 42 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 47f8497f5c..34bcf5e0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -64,8 +64,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -75,6 +77,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class StorageEngineV2 implements IService {
private static final Logger logger = LoggerFactory.getLogger(StorageEngineV2.class);
@@ -103,6 +106,9 @@ public class StorageEngineV2 implements IService {
private final ConcurrentHashMap<ConsensusGroupId, DataRegion> dataRegionMap =
new ConcurrentHashMap<>();
+ /** number of ready virtual storage group processors */
+ private AtomicInteger readyDataRegionNum;
+
private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
private ScheduledExecutorService ttlCheckThread;
@@ -199,16 +205,19 @@ public class StorageEngineV2 implements IService {
public void recover() {
setAllSgReady(false);
recoveryThreadPool =
- IoTDBThreadPoolFactory.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
- try {
- getLocalDataRegion();
- } catch (Exception e) {
- throw new StorageEngineFailureException("StorageEngine failed to recover.", e);
- }
+ IoTDBThreadPoolFactory.newCachedThreadPool(
+ ThreadName.DATA_REGION_RECOVER_SERVICE.getName());
+
List<Future<Void>> futures = new LinkedList<>();
asyncRecover(recoveryThreadPool, futures);
+ // wait until wal is recovered
+ try {
+ WALRecoverManager.getInstance().recover();
+ } catch (org.apache.iotdb.db.wal.exception.WALException e) {
+ logger.error("Fail to recover wal.", e);
+ }
+
// operations after all virtual storage groups are recovered
Thread recoverEndTrigger =
new Thread(
@@ -229,44 +238,57 @@ public class StorageEngineV2 implements IService {
recoverEndTrigger.start();
}
- private void getLocalDataRegion() throws MetadataException, DataRegionException {
- File system = SystemFileFactory.INSTANCE.getFile(systemDir);
- File[] sgDirs = system.listFiles();
- int sgCount = 0;
- for (File sgDir : sgDirs) {
- if (!sgDir.isDirectory()) {
- sgCount++;
- }
- }
+ private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
+
+ Map<String, List<String>> localDataRegionInfo = getLocalDataRegionInfo();
+ readyDataRegionNum = new AtomicInteger(0);
// init wal recover manager
WALRecoverManager.getInstance()
- .setAllDataRegionScannedLatch(new CountDownLatch(sgCount * config.getDataRegionNum()));
+ .setAllDataRegionScannedLatch(
+ new CountDownLatch(localDataRegionInfo.size() * config.getDataRegionNum()));
+ for (Map.Entry<String, List<String>> entry : localDataRegionInfo.entrySet()) {
+ String sgName = entry.getKey();
+ for (String dataRegionId : entry.getValue()) {
+ Callable<Void> recoverDataRegionTask =
+ () -> {
+ DataRegion dataRegion = null;
+ try {
+ dataRegion =
+ StorageEngineV2.getInstance()
+ .buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
+ } catch (DataRegionException e) {
+ logger.error("Failed to recover data region {}[{}]", sgName, dataRegionId, e);
+ }
+ dataRegionMap.put(new DataRegionId(Integer.parseInt(dataRegionId)), dataRegion);
+ dataRegion.setReady(true);
+ logger.info(
+ "Data regions have been recovered {}", readyDataRegionNum.incrementAndGet());
+ return null;
+ };
+ futures.add(pool.submit(recoverDataRegionTask));
+ }
+ }
+ }
+
+ private Map<String, List<String>> getLocalDataRegionInfo() {
+ File system = SystemFileFactory.INSTANCE.getFile(systemDir);
+ File[] sgDirs = system.listFiles();
+ Map<String, List<String>> localDataRegionInfo = new HashMap<>();
for (File sgDir : sgDirs) {
if (!sgDir.isDirectory()) {
continue;
}
- String sg = sgDir.getName();
- long ttl = Long.MAX_VALUE;
+ String sgName = sgDir.getName();
+ List<String> dataRegionIdList = new ArrayList<>();
for (File dataRegionDir : sgDir.listFiles()) {
if (!dataRegionDir.isDirectory()) {
continue;
}
- ConsensusGroupId dataRegionId = new DataRegionId(Integer.parseInt(dataRegionDir.getName()));
- DataRegion dataRegion = buildNewDataRegion(sg, dataRegionDir.getName(), ttl);
- dataRegionMap.putIfAbsent(dataRegionId, dataRegion);
+ dataRegionIdList.add(dataRegionDir.getName());
}
+ localDataRegionInfo.put(sgName, dataRegionIdList);
}
- }
-
- private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
- for (DataRegion processor : dataRegionMap.values()) {
- Callable<Void> recoverVsgTask =
- () -> {
- processor.setReady(true);
- return null;
- };
- futures.add(pool.submit(recoverVsgTask));
- }
+ return localDataRegionInfo;
}
@Override
@@ -437,21 +459,21 @@ public class StorageEngineV2 implements IService {
*/
public DataRegion buildNewDataRegion(
String logicalStorageGroupName, String dataRegionId, long ttl) throws DataRegionException {
- DataRegion processor;
+ DataRegion dataRegion;
logger.info(
"construct a data region instance, the storage group is {}, Thread is {}",
logicalStorageGroupName,
Thread.currentThread().getId());
- processor =
+ dataRegion =
new DataRegion(
systemDir + File.separator + logicalStorageGroupName,
dataRegionId,
fileFlushPolicy,
logicalStorageGroupName);
- processor.setDataTTL(ttl);
- processor.setCustomFlushListeners(customFlushListeners);
- processor.setCustomCloseFileListeners(customCloseFileListeners);
- return processor;
+ dataRegion.setDataTTL(ttl);
+ dataRegion.setCustomFlushListeners(customFlushListeners);
+ dataRegion.setCustomCloseFileListeners(customCloseFileListeners);
+ return dataRegion;
}
/** This function is just for unit test. */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
index c22a702a1c..c87ff5ad98 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.exception.WALRecoverException;
import org.apache.iotdb.db.wal.node.WALNode;
import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -45,7 +44,7 @@ import java.util.concurrent.ExecutorService;
/** First set allVsgScannedLatch, then call recover method. */
public class WALRecoverManager {
- private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(WALRecoverManager.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
/** start recovery after all virtual storage groups have submitted unsealed zero-level TsFiles */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
index beb54592b7..349e464007 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
@@ -66,8 +66,8 @@ public abstract class AbstractResultListener {
try {
this.wait();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
logger.warn("Interrupted when waiting for result.", e);
+ Thread.currentThread().interrupt();
}
}
}