You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/16 01:24:49 UTC

[iotdb] branch master updated: [IOTDB-2934] Fix StorageEngineV2 cannot recover (#5558)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e3044dbd9 [IOTDB-2934] Fix StorageEngineV2 cannot recover (#5558)
1e3044dbd9 is described below

commit 1e3044dbd97acad4bccd6e05d204c7f9127a421f
Author: Haonan <hh...@outlook.com>
AuthorDate: Sat Apr 16 09:24:42 2022 +0800

    [IOTDB-2934] Fix StorageEngineV2 cannot recover (#5558)
---
 .../apache/iotdb/db/engine/StorageEngineV2.java    | 101 +++++++++++++--------
 .../iotdb/db/wal/recover/WALRecoverManager.java    |   3 +-
 .../db/wal/recover/file/TsFilePlanRedoer.java      |  53 +++++++++++
 .../file/UnsealedTsFileRecoverPerformer.java       |   5 +
 .../wal/utils/listener/AbstractResultListener.java |   2 +-
 5 files changed, 122 insertions(+), 42 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 47f8497f5c..61eb326d6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.utils.ThreadUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.db.wal.exception.WALException;
 import org.apache.iotdb.db.wal.recover.WALRecoverManager;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -64,8 +65,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.ConcurrentModificationException;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -75,6 +78,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class StorageEngineV2 implements IService {
   private static final Logger logger = LoggerFactory.getLogger(StorageEngineV2.class);
@@ -103,6 +107,9 @@ public class StorageEngineV2 implements IService {
   private final ConcurrentHashMap<ConsensusGroupId, DataRegion> dataRegionMap =
       new ConcurrentHashMap<>();
 
+  /** number of ready data region */
+  private AtomicInteger readyDataRegionNum;
+
   private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
 
   private ScheduledExecutorService ttlCheckThread;
@@ -114,6 +121,7 @@ public class StorageEngineV2 implements IService {
   // add customized listeners here for flush and close events
   private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
   private List<FlushListener> customFlushListeners = new ArrayList<>();
+  private int recoverDataRegionNum = 0;
 
   private StorageEngineV2() {}
 
@@ -199,16 +207,19 @@ public class StorageEngineV2 implements IService {
   public void recover() {
     setAllSgReady(false);
     recoveryThreadPool =
-        IoTDBThreadPoolFactory.newFixedThreadPool(
-            Runtime.getRuntime().availableProcessors(), "Recovery-Thread-Pool");
-    try {
-      getLocalDataRegion();
-    } catch (Exception e) {
-      throw new StorageEngineFailureException("StorageEngine failed to recover.", e);
-    }
+        IoTDBThreadPoolFactory.newCachedThreadPool(
+            ThreadName.DATA_REGION_RECOVER_SERVICE.getName());
+
     List<Future<Void>> futures = new LinkedList<>();
     asyncRecover(recoveryThreadPool, futures);
 
+    // wait until wal is recovered
+    try {
+      WALRecoverManager.getInstance().recover();
+    } catch (WALException e) {
+      logger.error("Fail to recover wal.", e);
+    }
+
     // operations after all virtual storage groups are recovered
     Thread recoverEndTrigger =
         new Thread(
@@ -229,44 +240,56 @@ public class StorageEngineV2 implements IService {
     recoverEndTrigger.start();
   }
 
-  private void getLocalDataRegion() throws MetadataException, DataRegionException {
-    File system = SystemFileFactory.INSTANCE.getFile(systemDir);
-    File[] sgDirs = system.listFiles();
-    int sgCount = 0;
-    for (File sgDir : sgDirs) {
-      if (!sgDir.isDirectory()) {
-        sgCount++;
-      }
-    }
+  private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
+
+    Map<String, List<String>> localDataRegionInfo = getLocalDataRegionInfo();
+    readyDataRegionNum = new AtomicInteger(0);
     // init wal recover manager
     WALRecoverManager.getInstance()
-        .setAllDataRegionScannedLatch(new CountDownLatch(sgCount * config.getDataRegionNum()));
+        .setAllDataRegionScannedLatch(new CountDownLatch(recoverDataRegionNum));
+    for (Map.Entry<String, List<String>> entry : localDataRegionInfo.entrySet()) {
+      String sgName = entry.getKey();
+      for (String dataRegionId : entry.getValue()) {
+        Callable<Void> recoverDataRegionTask =
+            () -> {
+              DataRegion dataRegion = null;
+              try {
+                dataRegion = buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
+              } catch (DataRegionException e) {
+                logger.error("Failed to recover data region {}[{}]", sgName, dataRegionId, e);
+              }
+              dataRegionMap.put(new DataRegionId(Integer.parseInt(dataRegionId)), dataRegion);
+              logger.info(
+                  "Data regions have been recovered {}/{}",
+                  readyDataRegionNum.incrementAndGet(),
+                  recoverDataRegionNum);
+              return null;
+            };
+        futures.add(pool.submit(recoverDataRegionTask));
+      }
+    }
+  }
+
+  private Map<String, List<String>> getLocalDataRegionInfo() {
+    File system = SystemFileFactory.INSTANCE.getFile(systemDir);
+    File[] sgDirs = system.listFiles();
+    Map<String, List<String>> localDataRegionInfo = new HashMap<>();
     for (File sgDir : sgDirs) {
       if (!sgDir.isDirectory()) {
         continue;
       }
-      String sg = sgDir.getName();
-      long ttl = Long.MAX_VALUE;
+      String sgName = sgDir.getName();
+      List<String> dataRegionIdList = new ArrayList<>();
       for (File dataRegionDir : sgDir.listFiles()) {
         if (!dataRegionDir.isDirectory()) {
           continue;
         }
-        ConsensusGroupId dataRegionId = new DataRegionId(Integer.parseInt(dataRegionDir.getName()));
-        DataRegion dataRegion = buildNewDataRegion(sg, dataRegionDir.getName(), ttl);
-        dataRegionMap.putIfAbsent(dataRegionId, dataRegion);
+        dataRegionIdList.add(dataRegionDir.getName());
+        recoverDataRegionNum++;
       }
+      localDataRegionInfo.put(sgName, dataRegionIdList);
     }
-  }
-
-  private void asyncRecover(ExecutorService pool, List<Future<Void>> futures) {
-    for (DataRegion processor : dataRegionMap.values()) {
-      Callable<Void> recoverVsgTask =
-          () -> {
-            processor.setReady(true);
-            return null;
-          };
-      futures.add(pool.submit(recoverVsgTask));
-    }
+    return localDataRegionInfo;
   }
 
   @Override
@@ -437,21 +460,21 @@ public class StorageEngineV2 implements IService {
    */
   public DataRegion buildNewDataRegion(
       String logicalStorageGroupName, String dataRegionId, long ttl) throws DataRegionException {
-    DataRegion processor;
+    DataRegion dataRegion;
     logger.info(
         "construct a data region instance, the storage group is {}, Thread is {}",
         logicalStorageGroupName,
         Thread.currentThread().getId());
-    processor =
+    dataRegion =
         new DataRegion(
             systemDir + File.separator + logicalStorageGroupName,
             dataRegionId,
             fileFlushPolicy,
             logicalStorageGroupName);
-    processor.setDataTTL(ttl);
-    processor.setCustomFlushListeners(customFlushListeners);
-    processor.setCustomCloseFileListeners(customCloseFileListeners);
-    return processor;
+    dataRegion.setDataTTL(ttl);
+    dataRegion.setCustomFlushListeners(customFlushListeners);
+    dataRegion.setCustomCloseFileListeners(customCloseFileListeners);
+    return dataRegion;
   }
 
   /** This function is just for unit test. */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
index c22a702a1c..c87ff5ad98 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.exception.WALRecoverException;
 import org.apache.iotdb.db.wal.node.WALNode;
 import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -45,7 +44,7 @@ import java.util.concurrent.ExecutorService;
 
 /** First set allVsgScannedLatch, then call recover method. */
 public class WALRecoverManager {
-  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final Logger logger = LoggerFactory.getLogger(WALRecoverManager.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   /** start recovery after all virtual storage groups have submitted unsealed zero-level TsFiles */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index 1932488fef..efd5cbf14a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -33,6 +33,9 @@ import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -131,6 +134,56 @@ public class TsFilePlanRedoer {
     }
   }
 
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  void redoInsert(InsertNode node) throws WriteProcessException, QueryProcessException {
+    if (tsFileResource != null) {
+      String deviceId =
+          node.isAligned()
+              ? node.getDevicePath().getDevicePath().getFullPath()
+              : node.getDevicePath().getFullPath();
+      // orders of insert node is guaranteed by storage engine, just check time in the file
+      // the last chunk group may contain the same data with the logs, ignore such logs in seq file
+      long lastEndTime = tsFileResource.getEndTime(deviceId);
+      long minTimeInNode;
+      if (node instanceof InsertRowNode) {
+        minTimeInNode = ((InsertRowNode) node).getTime();
+      } else {
+        minTimeInNode = ((InsertTabletNode) node).getTimes()[0];
+      }
+      if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTimeInNode && sequence) {
+        return;
+      }
+    }
+    // TODO(getMeasurementSchema)
+    //    plan.setMeasurementMNodes(new IMeasurementMNode[plan.getMeasurements().length]);
+    //    try {
+    //      if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
+    //        idTable.getSeriesSchemas(plan);
+    //      } else {
+    //        IoTDB.schemaProcessor.getSeriesSchemasAndReadLockDevice(plan);
+    //        plan.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(plan.getDevicePath()));
+    //      }
+    //    } catch (IOException | MetadataException e) {
+    //      throw new QueryProcessException("can't replay insert logs, ", e);
+    //    }
+
+    if (node instanceof InsertRowNode) {
+      if (node.isAligned()) {
+        recoveryMemTable.insertAlignedRow((InsertRowNode) node);
+      } else {
+        recoveryMemTable.insert((InsertRowNode) node);
+      }
+    } else {
+      if (node.isAligned()) {
+        recoveryMemTable.insertAlignedTablet(
+            (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount());
+      } else {
+        recoveryMemTable.insertTablet(
+            (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount());
+      }
+    }
+  }
+
   private void checkDataTypeAndMarkFailed(final IMeasurementMNode[] mNodes, InsertPlan tPlan) {
     for (int i = 0; i < mNodes.length; i++) {
       if (mNodes[i] == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index bb953b1742..29d60aabf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
@@ -198,6 +199,10 @@ public class UnsealedTsFileRecoverPerformer extends AbstractTsFileRecoverPerform
             walRedoer.resetRecoveryMemTable(memTable);
           }
           break;
+        case INSERT_ROW_NODE:
+        case INSERT_TABLET_NODE:
+          walRedoer.redoInsert((InsertNode) walEntry.getValue());
+          break;
       }
     } catch (Exception e) {
       logger.warn("meet error when redo wal of {}", tsFileResource.getTsFile(), e);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
index beb54592b7..349e464007 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/AbstractResultListener.java
@@ -66,8 +66,8 @@ public abstract class AbstractResultListener {
         try {
           this.wait();
         } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
           logger.warn("Interrupted when waiting for result.", e);
+          Thread.currentThread().interrupt();
         }
       }
     }