You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/11/16 15:27:30 UTC

[iotdb] branch master updated: [IOTDB-4965] Fix ttl info lost in data region after datanode restarted (#8018)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 31e93324ea [IOTDB-4965] Fix ttl info lost in data region after datanode restarted (#8018)
31e93324ea is described below

commit 31e93324eaa8ace62c3c0b9e1030b58c87e0cb7b
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Nov 16 23:27:24 2022 +0800

    [IOTDB-4965] Fix ttl info lost in data region after datanode restarted (#8018)
---
 .../consensus/response/DataNodeRegisterResp.java   | 20 +++++++++++++++++
 .../confignode/manager/ClusterSchemaManager.java   | 17 ++++++++++++++
 .../iotdb/confignode/manager/ConfigManager.java    |  1 +
 .../apache/iotdb/db/engine/StorageEngineV2.java    | 26 +++++++++++++++++++++-
 .../java/org/apache/iotdb/db/service/DataNode.java |  3 +++
 .../src/main/thrift/confignode.thrift              |  1 +
 6 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
index a4d9a13b72..bf4e64f57c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
@@ -26,9 +26,13 @@ import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 
 public class DataNodeRegisterResp implements DataSet {
 
@@ -43,6 +47,8 @@ public class DataNodeRegisterResp implements DataSet {
   private List<ByteBuffer> allTriggerInformation;
   private List<ByteBuffer> allUDFInformation;
 
+  private byte[] allTTLInformation;
+
   public DataNodeRegisterResp() {
     this.dataNodeId = null;
     this.globalConfig = null;
@@ -92,6 +98,19 @@ public class DataNodeRegisterResp implements DataSet {
     this.allUDFInformation = allUDFInformation;
   }
 
+  public void setAllTTLInformation(Map<String, Long> allTTLInformation) {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    try {
+      ReadWriteIOUtils.write(allTTLInformation.size(), outputStream);
+      for (Map.Entry<String, Long> entry : allTTLInformation.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+    } catch (IOException ignored) {
+    }
+    this.allTTLInformation = outputStream.toByteArray();
+  }
+
   public TDataNodeRegisterResp convertToRpcDataNodeRegisterResp() {
     TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
     resp.setStatus(status);
@@ -107,6 +126,7 @@ public class DataNodeRegisterResp implements DataSet {
       resp.setCqConfig(cqConfig);
       resp.setAllTriggerInformation(allTriggerInformation);
       resp.setAllUDFInformation(allUDFInformation);
+      resp.setAllTTLInformation(allTTLInformation);
     }
 
     return resp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 008bb69b11..7c11cf96ff 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -82,6 +82,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -206,6 +207,22 @@ public class ClusterSchemaManager {
     return new TShowStorageGroupResp().setStorageGroupInfoMap(infoMap).setStatus(StatusUtils.OK);
   }
 
+  public Map<String, Long> getAllTTLInfo() {
+    StorageGroupSchemaResp storageGroupSchemaResp =
+        (StorageGroupSchemaResp)
+            getMatchedStorageGroupSchema(new GetStorageGroupPlan(Arrays.asList("root", "**")));
+    Map<String, Long> infoMap = new ConcurrentHashMap<>();
+    if (storageGroupSchemaResp.getStatus().getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      // Return immediately if some StorageGroups doesn't exist
+      return infoMap;
+    }
+    for (TStorageGroupSchema storageGroupSchema : storageGroupSchemaResp.getSchemaMap().values()) {
+      infoMap.put(storageGroupSchema.getName(), storageGroupSchema.getTTL());
+    }
+    return infoMap;
+  }
+
   /**
    * Update TTL for the specific StorageGroup or all databases in a path
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 747a5c00f2..91893e5c48 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -264,6 +264,7 @@ public class ConfigManager implements IManager {
         dataSet.setTriggerInformation(
             triggerManager.getTriggerTable(false).getAllTriggerInformation());
         dataSet.setAllUDFInformation(udfManager.getUDFTable().getAllUDFInformation());
+        dataSet.setAllTTLInformation(clusterSchemaManager.getAllTTLInfo());
       } finally {
         triggerManager.getTriggerInfo().releaseTriggerTableLock();
         udfManager.getUdfInfo().releaseUDFTableLock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index edc2483679..ac4492cbc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -70,12 +71,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -116,6 +119,9 @@ public class StorageEngineV2 implements IService {
   private final ConcurrentHashMap<DataRegionId, DataRegion> deletingDataRegionMap =
       new ConcurrentHashMap<>();
 
+  /** Database name -> ttl, for region recovery only */
+  private final Map<String, Long> ttlMapForRecover = new ConcurrentHashMap<>();
+
   /** number of ready data region */
   private AtomicInteger readyDataRegionNum;
 
@@ -189,6 +195,19 @@ public class StorageEngineV2 implements IService {
     }
   }
 
+  public void updateTTLInfo(byte[] allTTLInformation) {
+    if (allTTLInformation == null) {
+      return;
+    }
+    ByteBuffer buffer = ByteBuffer.wrap(allTTLInformation);
+    int mapSize = ReadWriteIOUtils.readInt(buffer);
+    for (int i = 0; i < mapSize; i++) {
+      ttlMapForRecover.put(
+          Objects.requireNonNull(ReadWriteIOUtils.readString(buffer)),
+          ReadWriteIOUtils.readLong(buffer));
+    }
+  }
+
   public boolean isAllSgReady() {
     return isAllSgReady.get();
   }
@@ -222,6 +241,7 @@ public class StorageEngineV2 implements IService {
             () -> {
               checkResults(futures, "StorageEngine failed to recover.");
               setAllSgReady(true);
+              ttlMapForRecover.clear();
             });
     recoverEndTrigger.start();
   }
@@ -240,7 +260,11 @@ public class StorageEngineV2 implements IService {
             () -> {
               DataRegion dataRegion = null;
               try {
-                dataRegion = buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
+                dataRegion =
+                    buildNewDataRegion(
+                        sgName,
+                        dataRegionId,
+                        ttlMapForRecover.getOrDefault(sgName, Long.MAX_VALUE));
               } catch (DataRegionException e) {
                 logger.error(
                     "Failed to recover data region {}[{}]", sgName, dataRegionId.getId(), e);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 25b74c5971..4a85a5304e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -210,6 +210,9 @@ public class DataNode implements DataNodeMBean {
         // store triggerInformationList
         getTriggerInformationList(dataNodeRegisterResp.getAllTriggerInformation());
 
+        // store ttl information
+        StorageEngineV2.getInstance().updateTTLInfo(dataNodeRegisterResp.getAllTTLInformation());
+
         if (dataNodeRegisterResp.getStatus().getCode()
                 == TSStatusCode.SUCCESS_STATUS.getStatusCode()
             || dataNodeRegisterResp.getStatus().getCode()
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index f88d36f301..cbd58d9588 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -39,6 +39,7 @@ struct TDataNodeRegisterResp {
   7: optional list<binary> allTriggerInformation
   8: optional TCQConfig cqConfig
   9: optional list<binary> allUDFInformation
+  10: optional binary allTTLInformation
 }
 
 struct TGlobalConfig {