You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/15 09:08:35 UTC

[iotdb] branch fix_storageenginev2_recover updated: Fix DataRegion recover logic

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_storageenginev2_recover
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fix_storageenginev2_recover by this push:
     new e01893c916 Fix DataRegion recover logic
e01893c916 is described below

commit e01893c91682b2ac5e16cf0ecc29272f9176b5b2
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Apr 15 17:08:18 2022 +0800

    Fix DataRegion recover logic
---
 .../apache/iotdb/db/engine/StorageEngineV2.java    |  8 ++--
 .../db/wal/recover/file/TsFilePlanRedoer.java      | 53 ++++++++++++++++++++++
 .../file/UnsealedTsFileRecoverPerformer.java       |  5 ++
 3 files changed, 61 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 34bcf5e0c9..173668c068 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.utils.ThreadUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.db.wal.exception.WALException;
 import org.apache.iotdb.db.wal.recover.WALRecoverManager;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -214,7 +215,7 @@ public class StorageEngineV2 implements IService {
     // wait until wal is recovered
     try {
       WALRecoverManager.getInstance().recover();
-    } catch (org.apache.iotdb.db.wal.exception.WALException e) {
+    } catch (WALException e) {
       logger.error("Fail to recover wal.", e);
     }
 
@@ -253,14 +254,11 @@ public class StorageEngineV2 implements IService {
             () -> {
               DataRegion dataRegion = null;
               try {
-                dataRegion =
-                    StorageEngineV2.getInstance()
-                        .buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
+                dataRegion = buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
               } catch (DataRegionException e) {
                 logger.error("Failed to recover data region {}[{}]", sgName, dataRegionId, e);
               }
               dataRegionMap.put(new DataRegionId(Integer.parseInt(dataRegionId)), dataRegion);
-              dataRegion.setReady(true);
               logger.info(
                   "Data regions have been recovered {}", readyDataRegionNum.incrementAndGet());
               return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index 1932488fef..efd5cbf14a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -33,6 +33,9 @@ import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -131,6 +134,56 @@ public class TsFilePlanRedoer {
     }
   }
 
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  void redoInsert(InsertNode node) throws WriteProcessException, QueryProcessException {
+    if (tsFileResource != null) {
+      String deviceId =
+          node.isAligned()
+              ? node.getDevicePath().getDevicePath().getFullPath()
+              : node.getDevicePath().getFullPath();
+      // orders of insert node is guaranteed by storage engine, just check time in the file
+      // the last chunk group may contain the same data with the logs, ignore such logs in seq file
+      long lastEndTime = tsFileResource.getEndTime(deviceId);
+      long minTimeInNode;
+      if (node instanceof InsertRowNode) {
+        minTimeInNode = ((InsertRowNode) node).getTime();
+      } else {
+        minTimeInNode = ((InsertTabletNode) node).getTimes()[0];
+      }
+      if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTimeInNode && sequence) {
+        return;
+      }
+    }
+    // TODO(getMeasurementSchema)
+    //    plan.setMeasurementMNodes(new IMeasurementMNode[plan.getMeasurements().length]);
+    //    try {
+    //      if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
+    //        idTable.getSeriesSchemas(plan);
+    //      } else {
+    //        IoTDB.schemaProcessor.getSeriesSchemasAndReadLockDevice(plan);
+    //        plan.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(plan.getDevicePath()));
+    //      }
+    //    } catch (IOException | MetadataException e) {
+    //      throw new QueryProcessException("can't replay insert logs, ", e);
+    //    }
+
+    if (node instanceof InsertRowNode) {
+      if (node.isAligned()) {
+        recoveryMemTable.insertAlignedRow((InsertRowNode) node);
+      } else {
+        recoveryMemTable.insert((InsertRowNode) node);
+      }
+    } else {
+      if (node.isAligned()) {
+        recoveryMemTable.insertAlignedTablet(
+            (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount());
+      } else {
+        recoveryMemTable.insertTablet(
+            (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount());
+      }
+    }
+  }
+
   private void checkDataTypeAndMarkFailed(final IMeasurementMNode[] mNodes, InsertPlan tPlan) {
     for (int i = 0; i < mNodes.length; i++) {
       if (mNodes[i] == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index bb953b1742..29d60aabf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
@@ -198,6 +199,10 @@ public class UnsealedTsFileRecoverPerformer extends AbstractTsFileRecoverPerform
             walRedoer.resetRecoveryMemTable(memTable);
           }
           break;
+        case INSERT_ROW_NODE:
+        case INSERT_TABLET_NODE:
+          walRedoer.redoInsert((InsertNode) walEntry.getValue());
+          break;
       }
     } catch (Exception e) {
       logger.warn("meet error when redo wal of {}", tsFileResource.getTsFile(), e);