You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/16 12:22:43 UTC
[iotdb] 02/02: [IOTDB-4965] Fix ttl info lost in data region after datanode restart
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch fix_ttl_lost
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1687e8792cbab672ffa84ddc9e4fbebbfd4b05df
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed Nov 16 20:22:25 2022 +0800
[IOTDB-4965] Fix ttl info lost in data region after datanode restart
---
.../consensus/response/DataNodeRegisterResp.java | 22 ++++++++++---
.../confignode/manager/ClusterSchemaManager.java | 37 +++++-----------------
.../iotdb/confignode/manager/ConfigManager.java | 2 +-
.../apache/iotdb/db/engine/StorageEngineV2.java | 25 ++++++++++++++-
.../java/org/apache/iotdb/db/service/DataNode.java | 3 ++
.../src/main/thrift/confignode.thrift | 2 +-
6 files changed, 54 insertions(+), 37 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
index 05fefc2cf6..bf4e64f57c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
@@ -24,12 +24,15 @@ import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupInfo;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
public class DataNodeRegisterResp implements DataSet {
@@ -44,7 +47,7 @@ public class DataNodeRegisterResp implements DataSet {
private List<ByteBuffer> allTriggerInformation;
private List<ByteBuffer> allUDFInformation;
- private List<TStorageGroupInfo> allDatabasesInformation;
+ private byte[] allTTLInformation;
public DataNodeRegisterResp() {
this.dataNodeId = null;
@@ -95,8 +98,17 @@ public class DataNodeRegisterResp implements DataSet {
this.allUDFInformation = allUDFInformation;
}
- public void setAllDatabasesInformation(List<TStorageGroupInfo> allDatabasesInformation) {
- this.allDatabasesInformation = allDatabasesInformation;
+ public void setAllTTLInformation(Map<String, Long> allTTLInformation) {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try {
+ ReadWriteIOUtils.write(allTTLInformation.size(), outputStream);
+ for (Map.Entry<String, Long> entry : allTTLInformation.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ } catch (IOException ignored) {
+ }
+ this.allTTLInformation = outputStream.toByteArray();
}
public TDataNodeRegisterResp convertToRpcDataNodeRegisterResp() {
@@ -114,7 +126,7 @@ public class DataNodeRegisterResp implements DataSet {
resp.setCqConfig(cqConfig);
resp.setAllTriggerInformation(allTriggerInformation);
resp.setAllUDFInformation(allUDFInformation);
- resp.setAllDatabaseInformation(allDatabasesInformation);
+ resp.setAllTTLInformation(allTTLInformation);
}
return resp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index d83536ef73..6481c35ea3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -82,6 +82,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -206,42 +207,20 @@ public class ClusterSchemaManager {
return new TShowStorageGroupResp().setStorageGroupInfoMap(infoMap).setStatus(StatusUtils.OK);
}
- public TShowStorageGroupResp showAllTTL() {
+ public Map<String, Long> getAllTTLInfo() {
StorageGroupSchemaResp storageGroupSchemaResp =
- (StorageGroupSchemaResp) getMatchedStorageGroupSchema(getStorageGroupPlan);
+ (StorageGroupSchemaResp)
+ getMatchedStorageGroupSchema(new GetStorageGroupPlan(Arrays.asList("root", "**")));
+ Map<String, Long> infoMap = new ConcurrentHashMap<>();
if (storageGroupSchemaResp.getStatus().getCode()
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Return immediately if some StorageGroups doesn't exist
- return new TShowStorageGroupResp().setStatus(storageGroupSchemaResp.getStatus());
+ return infoMap;
}
-
- Map<String, TStorageGroupInfo> infoMap = new ConcurrentHashMap<>();
for (TStorageGroupSchema storageGroupSchema : storageGroupSchemaResp.getSchemaMap().values()) {
- String name = storageGroupSchema.getName();
- TStorageGroupInfo storageGroupInfo = new TStorageGroupInfo();
- storageGroupInfo.setName(name);
- storageGroupInfo.setTTL(storageGroupSchema.getTTL());
- storageGroupInfo.setSchemaReplicationFactor(storageGroupSchema.getSchemaReplicationFactor());
- storageGroupInfo.setDataReplicationFactor(storageGroupSchema.getDataReplicationFactor());
- storageGroupInfo.setTimePartitionInterval(storageGroupSchema.getTimePartitionInterval());
-
- try {
- storageGroupInfo.setSchemaRegionNum(
- getPartitionManager().getRegionCount(name, TConsensusGroupType.SchemaRegion));
- storageGroupInfo.setDataRegionNum(
- getPartitionManager().getRegionCount(name, TConsensusGroupType.DataRegion));
- } catch (StorageGroupNotExistsException e) {
- // Return immediately if some StorageGroups doesn't exist
- return new TShowStorageGroupResp()
- .setStatus(
- new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
- .setMessage(e.getMessage()));
- }
-
- infoMap.put(name, storageGroupInfo);
+ infoMap.put(storageGroupSchema.getName(), storageGroupSchema.getTTL());
}
-
- return new TShowStorageGroupResp().setStorageGroupInfoMap(infoMap).setStatus(StatusUtils.OK);
+ return infoMap;
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 7e88a3cfff..cffee4ccf9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -264,7 +264,7 @@ public class ConfigManager implements IManager {
dataSet.setTriggerInformation(
triggerManager.getTriggerTable(false).getAllTriggerInformation());
dataSet.setAllUDFInformation(udfManager.getUDFTable().getAllUDFInformation());
- dataSet.setAllDatabasesInformation(clusterSchemaManager.getAllTTLInfo());
+ dataSet.setAllTTLInformation(clusterSchemaManager.getAllTTLInfo());
} finally {
triggerManager.getTriggerInfo().releaseTriggerTableLock();
udfManager.getUdfInfo().releaseUDFTableLock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 94ae35c0cd..27079e5eb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -70,12 +71,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -116,6 +119,9 @@ public class StorageEngineV2 implements IService {
private final ConcurrentHashMap<DataRegionId, DataRegion> deletingDataRegionMap =
new ConcurrentHashMap<>();
+ /** Database name -> ttl, for region recovery only */
+ private final Map<String, Long> ttlMapForRecover = new ConcurrentHashMap<>();
+
/** number of ready data region */
private AtomicInteger readyDataRegionNum;
@@ -189,6 +195,19 @@ public class StorageEngineV2 implements IService {
}
}
+ public void UpdateTTLInfo(byte[] allTTLInformation) {
+ if (allTTLInformation == null) {
+ return;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(allTTLInformation);
+ int mapSize = ReadWriteIOUtils.readInt(buffer);
+ for (int i = 0; i < mapSize; i++) {
+ ttlMapForRecover.put(
+ Objects.requireNonNull(ReadWriteIOUtils.readString(buffer)),
+ ReadWriteIOUtils.readLong(buffer));
+ }
+ }
+
public boolean isAllSgReady() {
return isAllSgReady.get();
}
@@ -240,7 +259,11 @@ public class StorageEngineV2 implements IService {
() -> {
DataRegion dataRegion = null;
try {
- dataRegion = buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
+ dataRegion =
+ buildNewDataRegion(
+ sgName,
+ dataRegionId,
+ ttlMapForRecover.getOrDefault(sgName, Long.MAX_VALUE));
} catch (DataRegionException e) {
logger.error(
"Failed to recover data region {}[{}]", sgName, dataRegionId.getId(), e);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 25b74c5971..ad11794b52 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -210,6 +210,9 @@ public class DataNode implements DataNodeMBean {
// store triggerInformationList
getTriggerInformationList(dataNodeRegisterResp.getAllTriggerInformation());
+ // store ttl information
+ StorageEngineV2.getInstance().UpdateTTLInfo(dataNodeRegisterResp.getAllTTLInformation());
+
if (dataNodeRegisterResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| dataNodeRegisterResp.getStatus().getCode()
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 8fa1737a3f..cbd58d9588 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -39,7 +39,7 @@ struct TDataNodeRegisterResp {
7: optional list<binary> allTriggerInformation
8: optional TCQConfig cqConfig
9: optional list<binary> allUDFInformation
- 10: optional list<TStorageGroupInfo> allDatabaseInformation
+ 10: optional binary allTTLInformation
}
struct TGlobalConfig {