You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/16 01:24:49 UTC
[iotdb] branch master updated: [IOTDB-2934] Fix StorageEngineV2 cannot recover (#5558)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1e3044dbd9 [IOTDB-2934] Fix StorageEngineV2 cannot recover (#5558)
1e3044dbd9 is described below
commit 1e3044dbd97acad4bccd6e05d204c7f9127a421f
Author: Haonan <hh...@outlook.com>
AuthorDate: Sat Apr 16 09:24:42 2022 +0800
[IOTDB-2934] Fix StorageEngineV2 cannot recover (#5558)
---
.../apache/iotdb/db/engine/StorageEngineV2.java | 101 +++++++++++++--------
.../iotdb/db/wal/recover/WALRecoverManager.java | 3 +-
.../db/wal/recover/file/TsFilePlanRedoer.java | 53 +++++++++++
.../file/UnsealedTsFileRecoverPerformer.java | 5 +
.../wal/utils/listener/AbstractResultListener.java | 2 +-
5 files changed, 122 insertions(+), 42 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 47f8497f5c..61eb326d6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.db.wal.exception.WALException;
import org.apache.iotdb.db.wal.recover.WALRecoverManager;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -64,8 +65,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -75,6 +78,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class StorageEngineV2 implements IService {
private static final Logger logger = LoggerFactory.getLogger(StorageEngineV2.class);
@@ -103,6 +107,9 @@ public class StorageEngineV2 implements IService {
private final ConcurrentHashMap<ConsensusGroupId, DataRegion> dataRegionMap =
new ConcurrentHashMap<>();
+ /** number of ready data region */
+ private AtomicInteger readyDataRegionNum;
+
private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
private ScheduledExecutorService ttlCheckThread;
@@ -114,6 +121,7 @@ public class StorageEngineV2 implements IService {
// add customized listeners here for flush and close events
private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
private List<FlushListener> customFlushListeners = new ArrayList<>();
+ private int recoverDataRegionNum = 0;
private StorageEngineV2() {}
@@ -199,16 +207,19 @@ public class StorageEngineV2 implements IService {
public void recover() {
setAllSgReady(false);
recoveryThreadPool =
- IoTDBThreadPoolFactory.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
- try {
- getLocalDataRegion();
- } catch (Exception e) {
- throw new StorageEngineFailureException("StorageEngine failed to recover.", e);
- }
+ IoTDBThreadPoolFactory.newCachedThreadPool(
+ ThreadName.DATA_REGION_RECOVER_SERVICE.getName());
+
List<Future<Void>> futures = new LinkedList<>();
asyncRecover(recoveryThreadPool, futures);
+ // wait until wal is recovered
+ try {
+ WALRecoverManager.getInstance().recover();
+ } catch (WALException e) {
+ logger.error("Fail to recover wal.", e);
+ }
+
// operations after all virtual storage groups are recovered
Thread recoverEndTrigger =
new Thread(
@@ -229,44 +240,56 @@ public class StorageEngineV2 implements IService {
recoverEndTrigger.start();
}
- private void getLocalDataRegion() throws MetadataException, DataRegionException {
- File system = SystemFileFactory.INSTANCE.getFile(systemDir);
- File[] sgDirs = system.listFiles();
- int sgCount = 0;
- for (File sgDir : sgDirs) {
- if (!sgDir.isDirectory()) {
- sgCount++;
- }
- }
+ private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
+
+ Map<String, List<String>> localDataRegionInfo = getLocalDataRegionInfo();
+ readyDataRegionNum = new AtomicInteger(0);
// init wal recover manager
WALRecoverManager.getInstance()
- .setAllDataRegionScannedLatch(new CountDownLatch(sgCount * config.getDataRegionNum()));
+ .setAllDataRegionScannedLatch(new CountDownLatch(recoverDataRegionNum));
+ for (Map.Entry<String, List<String>> entry : localDataRegionInfo.entrySet()) {
+ String sgName = entry.getKey();
+ for (String dataRegionId : entry.getValue()) {
+ Callable<Void> recoverDataRegionTask =
+ () -> {
+ DataRegion dataRegion = null;
+ try {
+ dataRegion = buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
+ } catch (DataRegionException e) {
+ logger.error("Failed to recover data region {}[{}]", sgName, dataRegionId, e);
+ }
+ dataRegionMap.put(new DataRegionId(Integer.parseInt(dataRegionId)), dataRegion);
+ logger.info(
+ "Data regions have been recovered {}/{}",
+ readyDataRegionNum.incrementAndGet(),
+ recoverDataRegionNum);
+ return null;
+ };
+ futures.add(pool.submit(recoverDataRegionTask));
+ }
+ }
+ }
+
+ private Map<String, List<String>> getLocalDataRegionInfo() {
+ File system = SystemFileFactory.INSTANCE.getFile(systemDir);
+ File[] sgDirs = system.listFiles();
+ Map<String, List<String>> localDataRegionInfo = new HashMap<>();
for (File sgDir : sgDirs) {
if (!sgDir.isDirectory()) {
continue;
}
- String sg = sgDir.getName();
- long ttl = Long.MAX_VALUE;
+ String sgName = sgDir.getName();
+ List<String> dataRegionIdList = new ArrayList<>();
for (File dataRegionDir : sgDir.listFiles()) {
if (!dataRegionDir.isDirectory()) {
continue;
}
- ConsensusGroupId dataRegionId = new DataRegionId(Integer.parseInt(dataRegionDir.getName()));
- DataRegion dataRegion = buildNewDataRegion(sg, dataRegionDir.getName(), ttl);
- dataRegionMap.putIfAbsent(dataRegionId, dataRegion);
+ dataRegionIdList.add(dataRegionDir.getName());
+ recoverDataRegionNum++;
}
+ localDataRegionInfo.put(sgName, dataRegionIdList);
}
- }
-
- private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
- for (DataRegion processor : dataRegionMap.values()) {
- Callable<Void> recoverVsgTask =
- () -> {
- processor.setReady(true);
- return null;
- };
- futures.add(pool.submit(recoverVsgTask));
- }
+ return localDataRegionInfo;
}
@Override
@@ -437,21 +460,21 @@ public class StorageEngineV2 implements IService {
*/
public DataRegion buildNewDataRegion(
String logicalStorageGroupName, String dataRegionId, long ttl) throws DataRegionException {
- DataRegion processor;
+ DataRegion dataRegion;
logger.info(
"construct a data region instance, the storage group is {}, Thread is {}",
logicalStorageGroupName,
Thread.currentThread().getId());
- processor =
+ dataRegion =
new DataRegion(
systemDir + File.separator + logicalStorageGroupName,
dataRegionId,
fileFlushPolicy,
logicalStorageGroupName);
- processor.setDataTTL(ttl);
- processor.setCustomFlushListeners(customFlushListeners);
- processor.setCustomCloseFileListeners(customCloseFileListeners);
- return processor;
+ dataRegion.setDataTTL(ttl);
+ dataRegion.setCustomFlushListeners(customFlushListeners);
+ dataRegion.setCustomCloseFileListeners(customCloseFileListeners);
+ return dataRegion;
}
/** This function is just for unit test. */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
index c22a702a1c..c87ff5ad98 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.exception.WALRecoverException;
import org.apache.iotdb.db.wal.node.WALNode;
import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -45,7 +44,7 @@ import java.util.concurrent.ExecutorService;
/** First set allVsgScannedLatch, then call recover method. */
public class WALRecoverManager {
- private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(WALRecoverManager.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
/** start recovery after all virtual storage groups have submitted unsealed zero-level TsFiles */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index 1932488fef..efd5cbf14a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -33,6 +33,9 @@ import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -131,6 +134,56 @@ public class TsFilePlanRedoer {
}
}
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ void redoInsert(InsertNode node) throws WriteProcessException, QueryProcessException {
+ if (tsFileResource != null) {
+ String deviceId =
+ node.isAligned()
+ ? node.getDevicePath().getDevicePath().getFullPath()
+ : node.getDevicePath().getFullPath();
+ // orders of insert node is guaranteed by storage engine, just check time in the file
+ // the last chunk group may contain the same data with the logs, ignore such logs in seq file
+ long lastEndTime = tsFileResource.getEndTime(deviceId);
+ long minTimeInNode;
+ if (node instanceof InsertRowNode) {
+ minTimeInNode = ((InsertRowNode) node).getTime();
+ } else {
+ minTimeInNode = ((InsertTabletNode) node).getTimes()[0];
+ }
+ if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTimeInNode && sequence) {
+ return;
+ }
+ }
+ // TODO(getMeasurementSchema)
+ // plan.setMeasurementMNodes(new IMeasurementMNode[plan.getMeasurements().length]);
+ // try {
+ // if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
+ // idTable.getSeriesSchemas(plan);
+ // } else {
+ // IoTDB.schemaProcessor.getSeriesSchemasAndReadLockDevice(plan);
+ // plan.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(plan.getDevicePath()));
+ // }
+ // } catch (IOException | MetadataException e) {
+ // throw new QueryProcessException("can't replay insert logs, ", e);
+ // }
+
+ if (node instanceof InsertRowNode) {
+ if (node.isAligned()) {
+ recoveryMemTable.insertAlignedRow((InsertRowNode) node);
+ } else {
+ recoveryMemTable.insert((InsertRowNode) node);
+ }
+ } else {
+ if (node.isAligned()) {
+ recoveryMemTable.insertAlignedTablet(
+ (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount());
+ } else {
+ recoveryMemTable.insertTablet(
+ (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount());
+ }
+ }
+ }
+
private void checkDataTypeAndMarkFailed(final IMeasurementMNode[] mNodes, InsertPlan tPlan) {
for (int i = 0; i < mNodes.length; i++) {
if (mNodes[i] == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index bb953b1742..29d60aabf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.wal.buffer.WALEntry;
@@ -198,6 +199,10 @@ public class UnsealedTsFileRecoverPerformer extends AbstractTsFileRecoverPerform
walRedoer.resetRecoveryMemTable(memTable);
}
break;
+ case INSERT_ROW_NODE:
+ case INSERT_TABLET_NODE:
+ walRedoer.redoInsert((InsertNode) walEntry.getValue());
+ break;
}
} catch (Exception e) {
logger.warn("meet error when redo wal of {}", tsFileResource.getTsFile(), e);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
index beb54592b7..349e464007 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
@@ -66,8 +66,8 @@ public abstract class AbstractResultListener {
try {
this.wait();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
logger.warn("Interrupted when waiting for result.", e);
+ Thread.currentThread().interrupt();
}
}
}