You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/11/16 15:27:30 UTC
[iotdb] branch master updated: [IOTDB-4965] Fix ttl info lost in data region after datanode restarted (#8018)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 31e93324ea [IOTDB-4965] Fix ttl info lost in data region after datanode restarted (#8018)
31e93324ea is described below
commit 31e93324eaa8ace62c3c0b9e1030b58c87e0cb7b
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Nov 16 23:27:24 2022 +0800
[IOTDB-4965] Fix ttl info lost in data region after datanode restarted (#8018)
---
.../consensus/response/DataNodeRegisterResp.java | 20 +++++++++++++++++
.../confignode/manager/ClusterSchemaManager.java | 17 ++++++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 1 +
.../apache/iotdb/db/engine/StorageEngineV2.java | 26 +++++++++++++++++++++-
.../java/org/apache/iotdb/db/service/DataNode.java | 3 +++
.../src/main/thrift/confignode.thrift | 1 +
6 files changed, 67 insertions(+), 1 deletion(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
index a4d9a13b72..bf4e64f57c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
@@ -26,9 +26,13 @@ import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
public class DataNodeRegisterResp implements DataSet {
@@ -43,6 +47,8 @@ public class DataNodeRegisterResp implements DataSet {
private List<ByteBuffer> allTriggerInformation;
private List<ByteBuffer> allUDFInformation;
+ private byte[] allTTLInformation;
+
public DataNodeRegisterResp() {
this.dataNodeId = null;
this.globalConfig = null;
@@ -92,6 +98,19 @@ public class DataNodeRegisterResp implements DataSet {
this.allUDFInformation = allUDFInformation;
}
+ public void setAllTTLInformation(Map<String, Long> allTTLInformation) {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try {
+ ReadWriteIOUtils.write(allTTLInformation.size(), outputStream);
+ for (Map.Entry<String, Long> entry : allTTLInformation.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ } catch (IOException ignored) {
+ }
+ this.allTTLInformation = outputStream.toByteArray();
+ }
+
public TDataNodeRegisterResp convertToRpcDataNodeRegisterResp() {
TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
resp.setStatus(status);
@@ -107,6 +126,7 @@ public class DataNodeRegisterResp implements DataSet {
resp.setCqConfig(cqConfig);
resp.setAllTriggerInformation(allTriggerInformation);
resp.setAllUDFInformation(allUDFInformation);
+ resp.setAllTTLInformation(allTTLInformation);
}
return resp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 008bb69b11..7c11cf96ff 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -82,6 +82,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -206,6 +207,22 @@ public class ClusterSchemaManager {
return new TShowStorageGroupResp().setStorageGroupInfoMap(infoMap).setStatus(StatusUtils.OK);
}
+ public Map<String, Long> getAllTTLInfo() {
+ StorageGroupSchemaResp storageGroupSchemaResp =
+ (StorageGroupSchemaResp)
+ getMatchedStorageGroupSchema(new GetStorageGroupPlan(Arrays.asList("root", "**")));
+ Map<String, Long> infoMap = new ConcurrentHashMap<>();
+ if (storageGroupSchemaResp.getStatus().getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Return immediately if some StorageGroups doesn't exist
+ return infoMap;
+ }
+ for (TStorageGroupSchema storageGroupSchema : storageGroupSchemaResp.getSchemaMap().values()) {
+ infoMap.put(storageGroupSchema.getName(), storageGroupSchema.getTTL());
+ }
+ return infoMap;
+ }
+
/**
* Update TTL for the specific StorageGroup or all databases in a path
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 747a5c00f2..91893e5c48 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -264,6 +264,7 @@ public class ConfigManager implements IManager {
dataSet.setTriggerInformation(
triggerManager.getTriggerTable(false).getAllTriggerInformation());
dataSet.setAllUDFInformation(udfManager.getUDFTable().getAllUDFInformation());
+ dataSet.setAllTTLInformation(clusterSchemaManager.getAllTTLInfo());
} finally {
triggerManager.getTriggerInfo().releaseTriggerTableLock();
udfManager.getUdfInfo().releaseUDFTableLock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index edc2483679..ac4492cbc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -70,12 +71,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -116,6 +119,9 @@ public class StorageEngineV2 implements IService {
private final ConcurrentHashMap<DataRegionId, DataRegion> deletingDataRegionMap =
new ConcurrentHashMap<>();
+ /** Database name -> ttl, for region recovery only */
+ private final Map<String, Long> ttlMapForRecover = new ConcurrentHashMap<>();
+
/** number of ready data region */
private AtomicInteger readyDataRegionNum;
@@ -189,6 +195,19 @@ public class StorageEngineV2 implements IService {
}
}
+ public void updateTTLInfo(byte[] allTTLInformation) {
+ if (allTTLInformation == null) {
+ return;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(allTTLInformation);
+ int mapSize = ReadWriteIOUtils.readInt(buffer);
+ for (int i = 0; i < mapSize; i++) {
+ ttlMapForRecover.put(
+ Objects.requireNonNull(ReadWriteIOUtils.readString(buffer)),
+ ReadWriteIOUtils.readLong(buffer));
+ }
+ }
+
public boolean isAllSgReady() {
return isAllSgReady.get();
}
@@ -222,6 +241,7 @@ public class StorageEngineV2 implements IService {
() -> {
checkResults(futures, "StorageEngine failed to recover.");
setAllSgReady(true);
+ ttlMapForRecover.clear();
});
recoverEndTrigger.start();
}
@@ -240,7 +260,11 @@ public class StorageEngineV2 implements IService {
() -> {
DataRegion dataRegion = null;
try {
- dataRegion = buildNewDataRegion(sgName, dataRegionId, Long.MAX_VALUE);
+ dataRegion =
+ buildNewDataRegion(
+ sgName,
+ dataRegionId,
+ ttlMapForRecover.getOrDefault(sgName, Long.MAX_VALUE));
} catch (DataRegionException e) {
logger.error(
"Failed to recover data region {}[{}]", sgName, dataRegionId.getId(), e);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 25b74c5971..4a85a5304e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -210,6 +210,9 @@ public class DataNode implements DataNodeMBean {
// store triggerInformationList
getTriggerInformationList(dataNodeRegisterResp.getAllTriggerInformation());
+ // store ttl information
+ StorageEngineV2.getInstance().updateTTLInfo(dataNodeRegisterResp.getAllTTLInformation());
+
if (dataNodeRegisterResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| dataNodeRegisterResp.getStatus().getCode()
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index f88d36f301..cbd58d9588 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -39,6 +39,7 @@ struct TDataNodeRegisterResp {
7: optional list<binary> allTriggerInformation
8: optional TCQConfig cqConfig
9: optional list<binary> allUDFInformation
+ 10: optional binary allTTLInformation
}
struct TGlobalConfig {