You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/16 12:22:43 UTC

[iotdb] 02/02: [IOTDB-4965] Fix ttl info lost in data region after datanode restart

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_ttl_lost
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1687e8792cbab672ffa84ddc9e4fbebbfd4b05df
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed Nov 16 20:22:25 2022 +0800

    [IOTDB-4965] Fix ttl info lost in data region after datanode restart
---
 .../consensus/response/DataNodeRegisterResp.java   | 22 ++++++++++---
 .../confignode/manager/ClusterSchemaManager.java   | 37 +++++-----------------
 .../iotdb/confignode/manager/ConfigManager.java    |  2 +-
 .../apache/iotdb/db/engine/StorageEngineV2.java    | 25 ++++++++++++++-
 .../java/org/apache/iotdb/db/service/DataNode.java |  3 ++
 .../src/main/thrift/confignode.thrift              |  2 +-
 6 files changed, 54 insertions(+), 37 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
index 05fefc2cf6..bf4e64f57c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
@@ -24,12 +24,15 @@ import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupInfo;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 
 public class DataNodeRegisterResp implements DataSet {
 
@@ -44,7 +47,7 @@ public class DataNodeRegisterResp implements DataSet {
   private List<ByteBuffer> allTriggerInformation;
   private List<ByteBuffer> allUDFInformation;
 
-  private List<TStorageGroupInfo> allDatabasesInformation;
+  private byte[] allTTLInformation;
 
   public DataNodeRegisterResp() {
     this.dataNodeId = null;
@@ -95,8 +98,17 @@ public class DataNodeRegisterResp implements DataSet {
     this.allUDFInformation = allUDFInformation;
   }
 
-  public void setAllDatabasesInformation(List<TStorageGroupInfo> allDatabasesInformation) {
-    this.allDatabasesInformation = allDatabasesInformation;
+  public void setAllTTLInformation(Map<String, Long> allTTLInformation) {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    try {
+      ReadWriteIOUtils.write(allTTLInformation.size(), outputStream);
+      for (Map.Entry<String, Long> entry : allTTLInformation.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+    } catch (IOException ignored) {
+    }
+    this.allTTLInformation = outputStream.toByteArray();
   }
 
   public TDataNodeRegisterResp convertToRpcDataNodeRegisterResp() {
@@ -114,7 +126,7 @@ public class DataNodeRegisterResp implements DataSet {
       resp.setCqConfig(cqConfig);
       resp.setAllTriggerInformation(allTriggerInformation);
       resp.setAllUDFInformation(allUDFInformation);
-      resp.setAllDatabaseInformation(allDatabasesInformation);
+      resp.setAllTTLInformation(allTTLInformation);
     }
 
     return resp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index d83536ef73..6481c35ea3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -82,6 +82,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -206,42 +207,20 @@ public class ClusterSchemaManager {
     return new TShowStorageGroupResp().setStorageGroupInfoMap(infoMap).setStatus(StatusUtils.OK);
   }
 
-  public TShowStorageGroupResp showAllTTL() {
+  public Map<String, Long> getAllTTLInfo() {
     StorageGroupSchemaResp storageGroupSchemaResp =
-        (StorageGroupSchemaResp) getMatchedStorageGroupSchema(getStorageGroupPlan);
+        (StorageGroupSchemaResp)
+            getMatchedStorageGroupSchema(new GetStorageGroupPlan(Arrays.asList("root", "**")));
+    Map<String, Long> infoMap = new ConcurrentHashMap<>();
     if (storageGroupSchemaResp.getStatus().getCode()
         != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Return immediately if some StorageGroups doesn't exist
-      return new TShowStorageGroupResp().setStatus(storageGroupSchemaResp.getStatus());
+      return infoMap;
     }
-
-    Map<String, TStorageGroupInfo> infoMap = new ConcurrentHashMap<>();
     for (TStorageGroupSchema storageGroupSchema : storageGroupSchemaResp.getSchemaMap().values()) {
-      String name = storageGroupSchema.getName();
-      TStorageGroupInfo storageGroupInfo = new TStorageGroupInfo();
-      storageGroupInfo.setName(name);
-      storageGroupInfo.setTTL(storageGroupSchema.getTTL());
-      storageGroupInfo.setSchemaReplicationFactor(storageGroupSchema.getSchemaReplicationFactor());
-      storageGroupInfo.setDataReplicationFactor(storageGroupSchema.getDataReplicationFactor());
-      storageGroupInfo.setTimePartitionInterval(storageGroupSchema.getTimePartitionInterval());
-
-      try {
-        storageGroupInfo.setSchemaRegionNum(
-            getPartitionManager().getRegionCount(name, TConsensusGroupType.SchemaRegion));
-        storageGroupInfo.setDataRegionNum(
-            getPartitionManager().getRegionCount(name, TConsensusGroupType.DataRegion));
-      } catch (StorageGroupNotExistsException e) {
-        // Return immediately if some StorageGroups doesn't exist
-        return new TShowStorageGroupResp()
-            .setStatus(
-                new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
-                    .setMessage(e.getMessage()));
-      }
-
-      infoMap.put(name, storageGroupInfo);
+      infoMap.put(storageGroupSchema.getName(), storageGroupSchema.getTTL());
     }
-
-    return new TShowStorageGroupResp().setStorageGroupInfoMap(infoMap).setStatus(StatusUtils.OK);
+    return infoMap;
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 7e88a3cfff..cffee4ccf9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -264,7 +264,7 @@ public class ConfigManager implements IManager {
         dataSet.setTriggerInformation(
             triggerManager.getTriggerTable(false).getAllTriggerInformation());
         dataSet.setAllUDFInformation(udfManager.getUDFTable().getAllUDFInformation());
-        dataSet.setAllDatabasesInformation(clusterSchemaManager.getAllTTLInfo());
+        dataSet.setAllTTLInformation(clusterSchemaManager.getAllTTLInfo());
       } finally {
         triggerManager.getTriggerInfo().releaseTriggerTableLock();
         udfManager.getUdfInfo().releaseUDFTableLock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 94ae35c0cd..27079e5eb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -70,12 +71,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -116,6 +119,9 @@ public class StorageEngineV2 implements IService {
   private final ConcurrentHashMap<DataRegionId, DataRegion> deletingDataRegionMap =
       new ConcurrentHashMap<>();
 
+  /** Database name -> ttl, for region recovery only */
+  private final Map<String, Long> ttlMapForRecover = new ConcurrentHashMap<>();
+
   /** number of ready data region */
   private AtomicInteger readyDataRegionNum;
 
@@ -189,6 +195,19 @@ public class StorageEngineV2 implements IService {
     }
   }
 
+  public void UpdateTTLInfo(byte[] allTTLInformation) {
+    if (allTTLInformation == null) {
+      return;
+    }
+    ByteBuffer buffer = ByteBuffer.wrap(allTTLInformation);
+    int mapSize = ReadWriteIOUtils.readInt(buffer);
+    for (int i = 0; i < mapSize; i++) {
+      ttlMapForRecover.put(
+          Objects.requireNonNull(ReadWriteIOUtils.readString(buffer)),
+          ReadWriteIOUtils.readLong(buffer));
+    }
+  }
+
   public boolean isAllSgReady() {
     return isAllSgReady.get();
   }
@@ -240,7 +259,11 @@ public class StorageEngineV2 implements IService {
             () -> {
               DataRegion dataRegion = null;
               try {
-                dataRegion = buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
+                dataRegion =
+                    buildNewDataRegion(
+                        sgName,
+                        dataRegionId,
+                        ttlMapForRecover.getOrDefault(sgName, Long.MAX_VALUE));
               } catch (DataRegionException e) {
                 logger.error(
                     "Failed to recover data region {}[{}]", sgName, dataRegionId.getId(), e);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 25b74c5971..ad11794b52 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -210,6 +210,9 @@ public class DataNode implements DataNodeMBean {
         // store triggerInformationList
         getTriggerInformationList(dataNodeRegisterResp.getAllTriggerInformation());
 
+        // store ttl information
+        StorageEngineV2.getInstance().UpdateTTLInfo(dataNodeRegisterResp.getAllTTLInformation());
+
         if (dataNodeRegisterResp.getStatus().getCode()
                 == TSStatusCode.SUCCESS_STATUS.getStatusCode()
             || dataNodeRegisterResp.getStatus().getCode()
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 8fa1737a3f..cbd58d9588 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -39,7 +39,7 @@ struct TDataNodeRegisterResp {
   7: optional list<binary> allTriggerInformation
   8: optional TCQConfig cqConfig
   9: optional list<binary> allUDFInformation
-  10: optional list<TStorageGroupInfo> allDatabaseInformation
+  10: optional binary allTTLInformation
 }
 
 struct TGlobalConfig {