You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/12 09:02:25 UTC
[iotdb] branch master updated: [IOTDB-3352][IOTDB-3782] Fix the cache problem in PartitionCache and Optimize. (#6624)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 203b15bae6 [IOTDB-3352][IOTDB-3782] Fix the cache problem in PartitionCache and Optimize. (#6624)
203b15bae6 is described below
commit 203b15bae6a007416f5ad8bd40f64571b238a5d4
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Tue Jul 12 17:02:18 2022 +0800
[IOTDB-3352][IOTDB-3782] Fix the cache problem in PartitionCache and Optimize. (#6624)
---
.../resources/conf/iotdb-datanode.properties | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../mpp/plan/analyze/ClusterPartitionFetcher.java | 610 +++-------------
.../db/mpp/plan/analyze/cache/PartitionCache.java | 786 +++++++++++++++++++++
.../analyze/cache/StorageGroupCacheResult.java | 68 ++
.../datanode1conf/iotdb-datanode.properties | 2 +-
.../datanode2conf/iotdb-datanode.properties | 2 +-
.../datanode3conf/iotdb-datanode.properties | 2 +-
8 files changed, 971 insertions(+), 503 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 65355f51ad..0bbdb3c64e 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -1044,7 +1044,7 @@ timestamp_precision=ms
# cache size for partition.
# This cache is used to improve partition fetch from config node.
# Datatype: int
-# partition_cache_size=0
+# partition_cache_size=1000
####################
### Schema File Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a2c8ce8faa..697d87469b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -905,7 +905,7 @@ public class IoTDBConfig {
* Cache size of partition cache in {@link
* org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher}
*/
- private int partitionCacheSize = 0;
+ private int partitionCacheSize = 1000;
/** Cache size of user and role */
private int authorCacheSize = 100;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 1e1e5c00f0..6d69c095bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -20,44 +20,34 @@ package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
-import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
-import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.service.metrics.recorder.CacheMetricsRecorder;
+import org.apache.iotdb.db.mpp.plan.analyze.cache.PartitionCache;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,23 +55,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class ClusterPartitionFetcher implements IPartitionFetcher {
private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private static final List<String> ROOT_PATH = Arrays.asList("root", "**");
private final SeriesPartitionExecutor partitionExecutor;
@@ -105,9 +87,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
this.partitionExecutor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
config.getSeriesPartitionExecutorClass(), config.getSeriesPartitionSlotNum());
- this.partitionCache =
- new PartitionCache(
- config.getSeriesPartitionExecutorClass(), config.getSeriesPartitionSlotNum());
+ this.partitionCache = new PartitionCache();
}
@Override
@@ -116,15 +96,22 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
patternTree.constructTree();
List<String> devicePaths = patternTree.getAllDevicePatterns();
- Map<String, String> deviceToStorageGroupMap = getDeviceToStorageGroup(devicePaths, false);
- SchemaPartition schemaPartition = partitionCache.getSchemaPartition(deviceToStorageGroupMap);
+ Map<String, List<String>> storageGroupToDeviceMap =
+ partitionCache.getStorageGroupToDevice(devicePaths, false);
+ SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap);
if (null == schemaPartition) {
- TSchemaPartitionResp schemaPartitionResp =
- client.getSchemaPartition(constructSchemaPartitionReq(patternTree));
- if (schemaPartitionResp.getStatus().getCode()
+ TSchemaPartitionTableResp schemaPartitionTableResp =
+ client.getSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
+ if (schemaPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- schemaPartition = parseSchemaPartitionResp(schemaPartitionResp);
- partitionCache.updateSchemaPartitionCache(devicePaths, schemaPartition);
+ schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp);
+ partitionCache.updateSchemaPartitionCache(
+ schemaPartitionTableResp.getSchemaPartitionTable());
+ } else {
+ throw new RuntimeException(
+ new IoTDBException(
+ schemaPartitionTableResp.getStatus().getMessage(),
+ schemaPartitionTableResp.getStatus().getCode()));
}
}
return schemaPartition;
@@ -140,20 +127,22 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
patternTree.constructTree();
List<String> devicePaths = patternTree.getAllDevicePatterns();
- Map<String, String> deviceToStorageGroupMap = getDeviceToStorageGroup(devicePaths, true);
- SchemaPartition schemaPartition = partitionCache.getSchemaPartition(deviceToStorageGroupMap);
+ Map<String, List<String>> storageGroupToDeviceMap =
+ partitionCache.getStorageGroupToDevice(devicePaths, true);
+ SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap);
if (null == schemaPartition) {
- TSchemaPartitionResp schemaPartitionResp =
- client.getOrCreateSchemaPartition(constructSchemaPartitionReq(patternTree));
- if (schemaPartitionResp.getStatus().getCode()
+ TSchemaPartitionTableResp schemaPartitionTableResp =
+ client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
+ if (schemaPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- schemaPartition = parseSchemaPartitionResp(schemaPartitionResp);
- partitionCache.updateSchemaPartitionCache(devicePaths, schemaPartition);
+ schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp);
+ partitionCache.updateSchemaPartitionCache(
+ schemaPartitionTableResp.getSchemaPartitionTable());
} else {
throw new RuntimeException(
new IoTDBException(
- schemaPartitionResp.getStatus().getMessage(),
- schemaPartitionResp.getStatus().getCode()));
+ schemaPartitionTableResp.getStatus().getMessage(),
+ schemaPartitionTableResp.getStatus().getCode()));
}
}
return schemaPartition;
@@ -185,16 +174,21 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- TDataPartitionResp dataPartitionResp =
- client.getDataPartition(constructDataPartitionReq(sgNameToQueryParamsMap));
- if (dataPartitionResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return parseDataPartitionResp(dataPartitionResp);
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+
+ if (dataPartitionTableResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return parseDataPartitionResp(dataPartitionTableResp);
+ } else {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getDataPartition():"
+ + dataPartitionTableResp.getStatus().getMessage());
}
} catch (TException | IOException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getDataPartition():" + e.getMessage());
}
- return null;
}
@Override
@@ -205,12 +199,18 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
splitDataPartitionQueryParam(dataPartitionQueryParams, false);
DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
if (null == dataPartition) {
- TDataPartitionResp dataPartitionResp =
- client.getDataPartition(constructDataPartitionReq(splitDataPartitionQueryParams));
- if (dataPartitionResp.getStatus().getCode()
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getDataPartitionTable(constructDataPartitionReq(splitDataPartitionQueryParams));
+
+ if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataPartition = parseDataPartitionResp(dataPartitionResp);
- partitionCache.updateDataPartitionCache(dataPartition);
+ dataPartition = parseDataPartitionResp(dataPartitionTableResp);
+ partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
+ } else {
+ throw new RuntimeException(
+ new IoTDBException(
+ dataPartitionTableResp.getStatus().getMessage(),
+ dataPartitionTableResp.getStatus().getCode()));
}
}
return dataPartition;
@@ -226,14 +226,15 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
// Do not use data partition cache
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- TDataPartitionResp dataPartitionResp =
- client.getOrCreateDataPartition(constructDataPartitionReq(sgNameToQueryParamsMap));
- if (dataPartitionResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return parseDataPartitionResp(dataPartitionResp);
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getOrCreateDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+ if (dataPartitionTableResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return parseDataPartitionResp(dataPartitionTableResp);
} else {
throw new StatementAnalyzeException(
"An error occurred when executing getOrCreateDataPartition():"
- + dataPartitionResp.getStatus().getMessage());
+ + dataPartitionTableResp.getStatus().getMessage());
}
} catch (TException | IOException e) {
throw new StatementAnalyzeException(
@@ -250,13 +251,19 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
splitDataPartitionQueryParam(dataPartitionQueryParams, true);
DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
if (null == dataPartition) {
- TDataPartitionResp dataPartitionResp =
- client.getOrCreateDataPartition(
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getOrCreateDataPartitionTable(
constructDataPartitionReq(splitDataPartitionQueryParams));
- if (dataPartitionResp.getStatus().getCode()
+
+ if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataPartition = parseDataPartitionResp(dataPartitionResp);
- partitionCache.updateDataPartitionCache(dataPartition);
+ dataPartition = parseDataPartitionResp(dataPartitionTableResp);
+ partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
+ } else {
+ throw new RuntimeException(
+ new IoTDBException(
+ dataPartitionTableResp.getStatus().getMessage(),
+ dataPartitionTableResp.getStatus().getCode()));
}
}
return dataPartition;
@@ -268,86 +275,12 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
@Override
public boolean updateRegionCache(TRegionRouteReq req) {
- return partitionCache.updateGroupIdToReplicaSetMap(req);
+ return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(), req.getRegionRouteMap());
}
@Override
public void invalidAllCache() {
- logger.debug("Invalidate partition cache");
- partitionCache.storageGroupCache.clear();
- partitionCache.invalidAllDataPartitionCache();
- partitionCache.invalidAllSchemaPartitionCache();
- logger.debug("PartitionCache is invalid:{}", partitionCache);
- }
-
- /** get deviceToStorageGroup map */
- private Map<String, String> getDeviceToStorageGroup(
- List<String> devicePaths, boolean isAutoCreate) {
- Map<String, String> deviceToStorageGroup = new HashMap<>();
- // miss when devicePath contains *
- for (String devicePath : devicePaths) {
- if (devicePath.contains("*")) {
- return deviceToStorageGroup;
- }
- }
- // first try to hit cache
- boolean firstTryResult = partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup);
- if (!firstTryResult) {
- try (ConfigNodeClient client =
- configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- TStorageGroupSchemaResp storageGroupSchemaResp =
- client.getMatchedStorageGroupSchemas(ROOT_PATH);
- if (storageGroupSchemaResp.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- Set<String> storageGroupNames =
- storageGroupSchemaResp.getStorageGroupSchemaMap().keySet();
- // update all storage group into cache
- partitionCache.updateStorageCache(storageGroupNames);
- // second try to hit cache
- deviceToStorageGroup = new HashMap<>();
- boolean secondTryResult =
- partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup);
- if (!secondTryResult && isAutoCreate) {
- // try to auto create storage group
- Set<String> storageGroupNamesNeedCreated = new HashSet<>();
- for (String devicePath : devicePaths) {
- if (!deviceToStorageGroup.containsKey(devicePath)) {
- PartialPath storageGroupNameNeedCreated =
- MetaUtils.getStorageGroupPathByLevel(
- new PartialPath(devicePath), config.getDefaultStorageGroupLevel());
- storageGroupNamesNeedCreated.add(storageGroupNameNeedCreated.getFullPath());
- }
- }
- Set<String> successFullyCreatedStorageGroup = new HashSet<>();
- for (String storageGroupName : storageGroupNamesNeedCreated) {
- TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
- storageGroupSchema.setName(storageGroupName);
- TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
- TSStatus tsStatus = client.setStorageGroup(req);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()) {
- successFullyCreatedStorageGroup.add(storageGroupName);
- } else {
- partitionCache.updateStorageCache(successFullyCreatedStorageGroup);
- throw new RuntimeException(new IoTDBException(tsStatus.message, tsStatus.code));
- }
- }
- partitionCache.updateStorageCache(storageGroupNamesNeedCreated);
- // third try to hit cache
- deviceToStorageGroup = new HashMap<>();
- boolean thirdTryResult =
- partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup);
- if (!thirdTryResult) {
- throw new StatementAnalyzeException(
- "Failed to get Storage Group Map when executing getOrCreateDataPartition()");
- }
- }
- }
- } catch (TException | MetadataException | IOException e) {
- throw new StatementAnalyzeException(
- "An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
- }
- }
- return deviceToStorageGroup;
+ partitionCache.invalidAllCache();
}
/** split data partition query param by storage group */
@@ -357,13 +290,13 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
devicePaths.add(dataPartitionQueryParam.getDevicePath());
}
- Map<String, String> deviceToStorageGroup = getDeviceToStorageGroup(devicePaths, isAutoCreate);
-
+ Map<String, String> deviceToStorageGroupMap =
+ partitionCache.getDeviceToStorageGroup(devicePaths, isAutoCreate);
Map<String, List<DataPartitionQueryParam>> result = new HashMap<>();
for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
String devicePath = dataPartitionQueryParam.getDevicePath();
- if (deviceToStorageGroup.containsKey(devicePath)) {
- String storageGroup = deviceToStorageGroup.get(devicePath);
+ if (deviceToStorageGroupMap.containsKey(devicePath)) {
+ String storageGroup = deviceToStorageGroupMap.get(devicePath);
if (!result.containsKey(storageGroup)) {
result.put(storageGroup, new ArrayList<>());
}
@@ -432,9 +365,23 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
return new TDataPartitionReq(partitionSlotsMap);
}
- private SchemaPartition parseSchemaPartitionResp(TSchemaPartitionResp schemaPartitionResp) {
+ private SchemaPartition parseSchemaPartitionTableResp(
+ TSchemaPartitionTableResp schemaPartitionTableResp) {
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> regionReplicaMap = new HashMap<>();
+ for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> entry1 :
+ schemaPartitionTableResp.getSchemaPartitionTable().entrySet()) {
+ Map<TSeriesPartitionSlot, TRegionReplicaSet> result1 =
+ regionReplicaMap.computeIfAbsent(entry1.getKey(), k -> new HashMap<>());
+ for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> entry2 :
+ entry1.getValue().entrySet()) {
+ TSeriesPartitionSlot seriesPartitionSlot = entry2.getKey();
+ TConsensusGroupId consensusGroupId = entry2.getValue();
+ result1.put(seriesPartitionSlot, partitionCache.getRegionReplicaSet(consensusGroupId));
+ }
+ }
+
return new SchemaPartition(
- schemaPartitionResp.getSchemaRegionMap(),
+ regionReplicaMap,
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
}
@@ -448,365 +395,32 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
schemaNodeManagementResp.getMatchedNode());
}
- private DataPartition parseDataPartitionResp(TDataPartitionResp dataPartitionResp) {
- return new DataPartition(
- dataPartitionResp.getDataPartitionMap(),
- config.getSeriesPartitionExecutorClass(),
- config.getSeriesPartitionSlotNum());
- }
-
- private class PartitionCache {
- /** the size of partitionCache */
- private final int cacheSize = config.getPartitionCacheSize();
- /** the cache of storage group */
- private final Set<String> storageGroupCache = Collections.synchronizedSet(new HashSet<>());
- /** the lock of storage group cache */
- private final ReentrantReadWriteLock storageGroupCacheLock = new ReentrantReadWriteLock();
- /** device -> regionReplicaSet */
- private final Cache<String, TRegionReplicaSet> schemaPartitionCache;
- /** the lock of schemaPartition cache * */
- private final ReentrantReadWriteLock schemaPartitionCacheLock = new ReentrantReadWriteLock();
- /** seriesPartitionSlot, timesereisPartitionSlot -> regionReplicaSets * */
- private final Cache<DataPartitionCacheKey, List<TRegionReplicaSet>> dataPartitionCache;
- /** the lock of dataPartition cache */
- private final ReentrantReadWriteLock dataPartitionCacheLock = new ReentrantReadWriteLock();
- /** calculate slotId by device */
- private final String seriesSlotExecutorName;
-
- private final int seriesPartitionSlotNum;
-
- /** the latest time when groupIdToReplicaSetMap updated. */
- private final AtomicLong latestUpdateTime = new AtomicLong(0);
- /** TConsensusGroupId -> TRegionReplicaSet */
- private final Map<TConsensusGroupId, TRegionReplicaSet> groupIdToReplicaSetMap =
- new ConcurrentHashMap<>();
-
- public PartitionCache(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
- this.seriesSlotExecutorName = seriesSlotExecutorName;
- this.seriesPartitionSlotNum = seriesPartitionSlotNum;
- this.schemaPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
- this.dataPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
- }
-
- /** get storage group by cache */
- public boolean getStorageGroup(
- List<String> devicePaths, Map<String, String> deviceToStorageGroupMap) {
- storageGroupCacheLock.readLock().lock();
- try {
- boolean result = true;
- if (storageGroupCache.size() == 0) {
- logger.debug("Failed to get storage group");
- result = false;
- } else {
- for (String devicePath : devicePaths) {
- boolean hit = false;
- synchronized (storageGroupCache) {
- for (String storageGroup : storageGroupCache) {
- if (PathUtils.isStartWith(devicePath, storageGroup)) {
- deviceToStorageGroupMap.put(devicePath, storageGroup);
- hit = true;
- break;
- }
- }
- }
- if (!hit) {
- logger.debug("{} cannot hit storage group cache", devicePath);
- result = false;
- break;
- }
+ private DataPartition parseDataPartitionResp(TDataPartitionTableResp dataPartitionTableResp) {
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+ regionReplicaSet = new HashMap<>();
+ for (Map.Entry<
+ String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
+ entry1 : dataPartitionTableResp.getDataPartitionTable().entrySet()) {
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> result1 =
+ regionReplicaSet.computeIfAbsent(entry1.getKey(), k -> new HashMap<>());
+ for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
+ entry2 : entry1.getValue().entrySet()) {
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> result2 =
+ result1.computeIfAbsent(entry2.getKey(), k -> new HashMap<>());
+ for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> entry3 :
+ entry2.getValue().entrySet()) {
+ List<TRegionReplicaSet> regionReplicaSets = new LinkedList<>();
+ for (TConsensusGroupId consensusGroupId : entry3.getValue()) {
+ regionReplicaSets.add(partitionCache.getRegionReplicaSet(consensusGroupId));
}
+ result2.put(entry3.getKey(), regionReplicaSets);
}
- CacheMetricsRecorder.record(result, "StorageGroup");
- return result;
- } finally {
- storageGroupCacheLock.readLock().unlock();
}
}
- /** update the cache of storage group */
- public void updateStorageCache(Set<String> storageGroupNames) {
- storageGroupCacheLock.writeLock().lock();
- try {
- storageGroupCache.addAll(storageGroupNames);
- } finally {
- storageGroupCacheLock.writeLock().unlock();
- }
- }
-
- /** invalid storage group after delete */
- public void invalidStorageGroupCache(List<String> storageGroupNames) {
- storageGroupCacheLock.writeLock().lock();
- try {
- for (String storageGroupName : storageGroupNames) {
- storageGroupCache.remove(storageGroupName);
- }
- } finally {
- storageGroupCacheLock.writeLock().unlock();
- }
- }
-
- /** get schemaPartition by patternTree */
- public SchemaPartition getSchemaPartition(Map<String, String> deviceToStorageGroupMap) {
- schemaPartitionCacheLock.readLock().lock();
- try {
- String name = "SchemaPartition";
- if (deviceToStorageGroupMap.size() == 0) {
- CacheMetricsRecorder.record(false, name);
- return null;
- }
- Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
- new HashMap<>();
- // check cache for each device
- for (Map.Entry<String, String> entry : deviceToStorageGroupMap.entrySet()) {
- String device = entry.getKey();
- TSeriesPartitionSlot seriesPartitionSlot =
- partitionExecutor.getSeriesPartitionSlot(device);
- TRegionReplicaSet regionReplicaSet = schemaPartitionCache.getIfPresent(device);
- if (null == regionReplicaSet) {
- // if one device not find, then return cache miss.
- logger.debug("Failed to find schema partition");
- CacheMetricsRecorder.record(false, name);
- return null;
- }
- String storageGroupName = deviceToStorageGroupMap.get(device);
- if (!schemaPartitionMap.containsKey(storageGroupName)) {
- schemaPartitionMap.put(storageGroupName, new HashMap<>());
- }
- Map<TSeriesPartitionSlot, TRegionReplicaSet> regionReplicaSetMap =
- schemaPartitionMap.get(storageGroupName);
- regionReplicaSetMap.put(seriesPartitionSlot, regionReplicaSet);
- }
- logger.debug("Hit schema partition");
- // cache hit
- CacheMetricsRecorder.record(true, name);
- return new SchemaPartition(
- schemaPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
- } finally {
- schemaPartitionCacheLock.readLock().unlock();
- }
- }
-
- /** get dataPartition by query param map */
- public DataPartition getDataPartition(
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
- dataPartitionCacheLock.readLock().lock();
- try {
- String name = "DataPartition";
- if (sgNameToQueryParamsMap.size() == 0) {
- CacheMetricsRecorder.record(false, name);
- return null;
- }
- Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
- dataPartitionMap = new HashMap<>();
- // check cache for each storage group
- for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
- sgNameToQueryParamsMap.entrySet()) {
- String storageGroupName = entry.getKey();
- if (!dataPartitionMap.containsKey(storageGroupName)) {
- dataPartitionMap.put(storageGroupName, new HashMap<>());
- }
- Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
- seriesSlotToTimePartitionMap = dataPartitionMap.get(storageGroupName);
- // check cache for each query param
- for (DataPartitionQueryParam dataPartitionQueryParam : entry.getValue()) {
- if (null == dataPartitionQueryParam.getTimePartitionSlotList()
- || 0 == dataPartitionQueryParam.getTimePartitionSlotList().size()) {
- // if query all data, cache miss
- logger.debug("Failed to find data partition");
- CacheMetricsRecorder.record(false, name);
- return null;
- }
- TSeriesPartitionSlot seriesPartitionSlot =
- partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath());
- if (!seriesSlotToTimePartitionMap.containsKey(seriesPartitionSlot)) {
- seriesSlotToTimePartitionMap.put(seriesPartitionSlot, new HashMap<>());
- }
- Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotListMap =
- seriesSlotToTimePartitionMap.get(seriesPartitionSlot);
- // check cache for each time partition
- for (TTimePartitionSlot timePartitionSlot :
- dataPartitionQueryParam.getTimePartitionSlotList()) {
- DataPartitionCacheKey dataPartitionCacheKey =
- new DataPartitionCacheKey(seriesPartitionSlot, timePartitionSlot);
- List<TRegionReplicaSet> regionReplicaSets =
- dataPartitionCache.getIfPresent(dataPartitionCacheKey);
- if (null == regionReplicaSets) {
- // if one time partition not find, cache miss
- logger.debug("Failed to find data partition");
- CacheMetricsRecorder.record(false, name);
- return null;
- }
- timePartitionSlotListMap.put(timePartitionSlot, regionReplicaSets);
- }
- }
- }
- logger.debug("Hit data partition");
- // cache hit
- CacheMetricsRecorder.record(true, name);
- return new DataPartition(dataPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
- } finally {
- dataPartitionCacheLock.readLock().unlock();
- }
- }
-
- /** update schemaPartitionCache by schemaPartition. */
- public void updateSchemaPartitionCache(List<String> devices, SchemaPartition schemaPartition) {
- schemaPartitionCacheLock.writeLock().lock();
- try {
- Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> storageGroupPartitionMap =
- schemaPartition.getSchemaPartitionMap();
- Set<String> storageGroupNames = storageGroupPartitionMap.keySet();
- for (String device : devices) {
- if (!device.contains("*")) {
- String storageGroup = null;
- for (String storageGroupName : storageGroupNames) {
- if (PathUtils.isStartWith(device, storageGroupName)) {
- storageGroup = storageGroupName;
- break;
- }
- }
- if (null == storageGroup) {
- // device not exist
- continue;
- }
- TSeriesPartitionSlot seriesPartitionSlot =
- partitionExecutor.getSeriesPartitionSlot(device);
- TRegionReplicaSet regionReplicaSet =
- storageGroupPartitionMap.get(storageGroup).getOrDefault(seriesPartitionSlot, null);
- if (null == regionReplicaSet) {
- logger.error(
- "Failed to get the regionReplicaSet of {} when update SchemaPartitionCache",
- device);
- continue;
- }
- schemaPartitionCache.put(device, regionReplicaSet);
- }
- }
- } finally {
- schemaPartitionCacheLock.writeLock().unlock();
- }
- }
-
- /** update dataPartitionCache by dataPartition */
- public void updateDataPartitionCache(DataPartition dataPartition) {
- dataPartitionCacheLock.writeLock().lock();
- try {
- for (Map.Entry<
- String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
- entry1 : dataPartition.getDataPartitionMap().entrySet()) {
- for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
- entry2 : entry1.getValue().entrySet()) {
- TSeriesPartitionSlot seriesPartitionSlot = entry2.getKey();
- for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> entry3 :
- entry2.getValue().entrySet()) {
- DataPartitionCacheKey dataPartitionCacheKey =
- new DataPartitionCacheKey(seriesPartitionSlot, entry3.getKey());
- dataPartitionCache.put(dataPartitionCacheKey, entry3.getValue());
- }
- }
- }
- } finally {
- dataPartitionCacheLock.writeLock().unlock();
- }
- }
-
- /** invalid schemaPartitionCache by device */
- public void invalidSchemaPartitionCache(String device) {
- // TODO should be called in two situation: 1. redirect status 2. config node trigger
- schemaPartitionCacheLock.writeLock().lock();
- try {
- schemaPartitionCache.invalidate(device);
- } finally {
- schemaPartitionCacheLock.writeLock().unlock();
- }
- }
-
- /** invalid dataPartitionCache by seriesPartitionSlot, timePartitionSlot */
- public void invalidDataPartitionCache(
- TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) {
- dataPartitionCacheLock.writeLock().lock();
- // TODO should be called in two situation: 1. redirect status 2. config node trigger
- try {
- DataPartitionCacheKey dataPartitionCacheKey =
- new DataPartitionCacheKey(seriesPartitionSlot, timePartitionSlot);
- dataPartitionCache.invalidate(dataPartitionCacheKey);
- } finally {
- dataPartitionCacheLock.writeLock().unlock();
- }
- }
-
- /** invalid schemaPartitionCache by device */
- public void invalidAllSchemaPartitionCache() {
- schemaPartitionCacheLock.writeLock().lock();
- try {
- schemaPartitionCache.invalidateAll();
- } finally {
- schemaPartitionCacheLock.writeLock().unlock();
- }
- }
-
- /** invalid dataPartitionCache by seriesPartitionSlot, timePartitionSlot */
- public void invalidAllDataPartitionCache() {
- dataPartitionCacheLock.writeLock().lock();
- try {
- dataPartitionCache.invalidateAll();
- } finally {
- dataPartitionCacheLock.writeLock().unlock();
- }
- }
-
- public boolean updateGroupIdToReplicaSetMap(TRegionRouteReq req) {
- long timestamp = req.getTimestamp();
- boolean result = (timestamp == latestUpdateTime.accumulateAndGet(timestamp, Math::max));
- // if timestamp is greater than latestUpdateTime, then update
- if (result) {
- groupIdToReplicaSetMap.clear();
- groupIdToReplicaSetMap.putAll(req.getRegionRouteMap());
- }
- return result;
- }
-
- @Override
- public String toString() {
- return "PartitionCache{"
- + "cacheSize="
- + cacheSize
- + ", storageGroupCache="
- + storageGroupCache
- + ", schemaPartitionCache="
- + schemaPartitionCache
- + ", dataPartitionCache="
- + dataPartitionCache
- + '}';
- }
- }
-
- private static class DataPartitionCacheKey {
- private TSeriesPartitionSlot seriesPartitionSlot;
- private TTimePartitionSlot timePartitionSlot;
-
- public DataPartitionCacheKey(
- TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) {
- this.seriesPartitionSlot = seriesPartitionSlot;
- this.timePartitionSlot = timePartitionSlot;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- DataPartitionCacheKey that = (DataPartitionCacheKey) o;
- return Objects.equals(seriesPartitionSlot, that.seriesPartitionSlot)
- && Objects.equals(timePartitionSlot, that.timePartitionSlot);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(seriesPartitionSlot, timePartitionSlot);
- }
+ return new DataPartition(
+ regionReplicaSet,
+ config.getSeriesPartitionExecutorClass(),
+ config.getSeriesPartitionSlotNum());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
new file mode 100644
index 0000000000..bb9481570a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
@@ -0,0 +1,786 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.analyze.cache;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.service.metrics.recorder.CacheMetricsRecorder;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class PartitionCache {
+ private static final Logger logger = LoggerFactory.getLogger(PartitionCache.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final List<String> ROOT_PATH = Arrays.asList("root", "**");
+ private static final String STORAGE_GROUP_CACHE_NAME = "StorageGroup";
+ private static final String SCHEMA_PARTITION_CACHE_NAME = "SchemaPartition";
+ private static final String DATA_PARTITION_CACHE_NAME = "DataPartition";
+
+ /** calculate slotId by device */
+ private final String seriesSlotExecutorName = config.getSeriesPartitionExecutorClass();
+
+ private final int seriesPartitionSlotNum = config.getSeriesPartitionSlotNum();
+ private final SeriesPartitionExecutor partitionExecutor;
+
+ /** the size of partitionCache */
+ private final int cacheSize = config.getPartitionCacheSize();
+ /** the cache of storage group */
+ private final Set<String> storageGroupCache = Collections.synchronizedSet(new HashSet<>());
+ /** storage -> schemaPartitionTable */
+ private final Cache<String, SchemaPartitionTable> schemaPartitionCache;
+ /** storage -> dataPartitionTable */
+ private final Cache<String, DataPartitionTable> dataPartitionCache;
+
+ /** the latest time when groupIdToReplicaSetMap updated. */
+ private final AtomicLong latestUpdateTime = new AtomicLong(0);
+ /** TConsensusGroupId -> TRegionReplicaSet */
+ private final Map<TConsensusGroupId, TRegionReplicaSet> groupIdToReplicaSetMap =
+ new ConcurrentHashMap<>();
+
+ /** The lock of cache */
+ private final ReentrantReadWriteLock storageGroupCacheLock = new ReentrantReadWriteLock();
+
+ private final ReentrantReadWriteLock schemaPartitionCacheLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock dataPartitionCacheLock = new ReentrantReadWriteLock();
+
+ private final IClientManager<PartitionRegionId, ConfigNodeClient> configNodeClientManager =
+ new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
+ .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+
+ public PartitionCache() {
+ this.schemaPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
+ this.dataPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
+ this.partitionExecutor =
+ SeriesPartitionExecutor.getSeriesPartitionExecutor(
+ this.seriesSlotExecutorName, this.seriesPartitionSlotNum);
+ }
+
+ // region storage group cache
+
+ /**
+ * get storage group to device map
+ *
+ * @param devicePaths the devices that need to hit
+ * @param isAutoCreate whether auto create storage group when cache miss
+ */
+ public Map<String, List<String>> getStorageGroupToDevice(
+ List<String> devicePaths, boolean isAutoCreate) {
+ StorageGroupCacheResult<List<String>> result =
+ new StorageGroupCacheResult<List<String>>() {
+ @Override
+ public void put(String device, String storageGroupName) {
+ map.computeIfAbsent(storageGroupName, k -> new ArrayList<>());
+ map.get(storageGroupName).add(device);
+ }
+ };
+ getStorageGroupCacheResult(result, devicePaths, isAutoCreate);
+ return result.getMap();
+ }
+
+ /**
+ * get device to storage group map
+ *
+ * @param devicePaths the devices that need to hit
+ * @param isAutoCreate whether auto create storage group when cache miss
+ */
+ public Map<String, String> getDeviceToStorageGroup(
+ List<String> devicePaths, boolean isAutoCreate) {
+ StorageGroupCacheResult<String> result =
+ new StorageGroupCacheResult<String>() {
+ @Override
+ public void put(String device, String storageGroupName) {
+ map.put(device, storageGroupName);
+ }
+ };
+ getStorageGroupCacheResult(result, devicePaths, isAutoCreate);
+ return result.getMap();
+ }
+
+ /**
+ * get storage group of device
+ *
+ * @param devicePath the path of device
+ * @return storage group name, return null if cache miss
+ */
+ private String getStorageGroupName(String devicePath) {
+ synchronized (storageGroupCache) {
+ for (String storageGroupName : storageGroupCache) {
+ if (PathUtils.isStartWith(devicePath, storageGroupName)) {
+ return storageGroupName;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * get all storage group from confignode and update storage group cache
+ *
+ * @param result the result of get storage group cache
+ * @param devicePaths the devices that need to hit
+ */
+ private void fetchStorageGroupAndUpdateCache(
+ StorageGroupCacheResult<?> result, List<String> devicePaths) throws IOException, TException {
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ storageGroupCacheLock.writeLock().lock();
+ result.reset();
+ getStorageGroupMap(result, devicePaths, true);
+ if (!result.isSuccess()) {
+ TStorageGroupSchemaResp storageGroupSchemaResp =
+ client.getMatchedStorageGroupSchemas(ROOT_PATH);
+ if (storageGroupSchemaResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ Set<String> storageGroupNames =
+ storageGroupSchemaResp.getStorageGroupSchemaMap().keySet();
+ // update all storage group into cache
+ updateStorageCache(storageGroupNames);
+ }
+ }
+ } finally {
+ storageGroupCacheLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * create not existed storage group and update storage group cache
+ *
+ * @param result the result of get storage group cache
+ * @param devicePaths the devices that need to hit
+ * @throws RuntimeException if failed to create storage group
+ */
+ private void createStorageGroupAndUpdateCache(
+ StorageGroupCacheResult<?> result, List<String> devicePaths)
+ throws IOException, MetadataException, TException {
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ storageGroupCacheLock.writeLock().lock();
+ // try to check whether storage group need to be created
+ result.reset();
+ // try to hit storage group with all missed devices
+ getStorageGroupMap(result, devicePaths, false);
+ if (!result.isSuccess()) {
+ // try to get storage group needed to be created from missed device
+ Set<String> storageGroupNamesNeedCreated = new HashSet<>();
+ for (String devicePath : result.getMissedDevices()) {
+ PartialPath storageGroupNameNeedCreated =
+ MetaUtils.getStorageGroupPathByLevel(
+ new PartialPath(devicePath), config.getDefaultStorageGroupLevel());
+ storageGroupNamesNeedCreated.add(storageGroupNameNeedCreated.getFullPath());
+ }
+
+ // try to create storage groups one by one until done or one storage group fail
+ Set<String> successFullyCreatedStorageGroup = new HashSet<>();
+ for (String storageGroupName : storageGroupNamesNeedCreated) {
+ TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
+ storageGroupSchema.setName(storageGroupName);
+ TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
+ TSStatus tsStatus = client.setStorageGroup(req);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()) {
+ successFullyCreatedStorageGroup.add(storageGroupName);
+ } else {
+ // try to update cache by storage groups successfully created
+ updateStorageCache(successFullyCreatedStorageGroup);
+ logger.warn(
+ "[{} Cache] failed to create storage group {}",
+ STORAGE_GROUP_CACHE_NAME,
+ storageGroupName);
+ throw new RuntimeException(new IoTDBException(tsStatus.message, tsStatus.code));
+ }
+ }
+ // try to update storage group cache when all storage groups has already been created
+ updateStorageCache(storageGroupNamesNeedCreated);
+ }
+ } finally {
+ storageGroupCacheLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * get storage group map in one try
+ *
+ * @param result contains result(boolean), failed devices and the map
+ * @param devicePaths the devices that need to hit
+ * @param failFast if true, return when failed. if false, return when all devices hit
+ */
+ private void getStorageGroupMap(
+ StorageGroupCacheResult<?> result, List<String> devicePaths, boolean failFast) {
+ try {
+ storageGroupCacheLock.readLock().lock();
+ // reset result before try
+ result.reset();
+ boolean status = true;
+ for (String devicePath : devicePaths) {
+ String storageGroupName = getStorageGroupName(devicePath);
+ if (null == storageGroupName) {
+ logger.debug(
+ "[{} Cache] miss when search device {}", STORAGE_GROUP_CACHE_NAME, devicePath);
+ status = false;
+ if (failFast) {
+ break;
+ } else {
+ result.addMissedDevice(devicePath);
+ }
+ } else {
+ result.put(devicePath, storageGroupName);
+ }
+ }
+ // setFailed the result when miss
+ if (!status) {
+ result.setFailed();
+ }
+ logger.debug("[{} Cache] hit when search device {}", STORAGE_GROUP_CACHE_NAME, devicePaths);
+ CacheMetricsRecorder.record(status, STORAGE_GROUP_CACHE_NAME);
+ } finally {
+ storageGroupCacheLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * get storage group map in three try
+ *
+ * @param result contains result, failed devices and map
+ * @param devicePaths the devices that need to hit
+ * @param isAutoCreate whether auto create storage group when device miss
+ */
+ private void getStorageGroupCacheResult(
+ StorageGroupCacheResult<?> result, List<String> devicePaths, boolean isAutoCreate) {
+ Map<String, String> deviceToStorageGroupMap = new HashMap<>();
+ // miss when devicePath contains *
+ for (String devicePath : devicePaths) {
+ if (devicePath.contains("*")) {
+ return;
+ }
+ }
+ // first try to hit storage group in fast-fail way
+ getStorageGroupMap(result, devicePaths, true);
+ if (!result.isSuccess()) {
+ try {
+ // try to fetch storage group from config node when miss
+ fetchStorageGroupAndUpdateCache(result, devicePaths);
+ // second try to hit storage group in fast-fail way
+ getStorageGroupMap(result, devicePaths, true);
+ if (!result.isSuccess() && isAutoCreate) {
+ // try to auto create storage group of failed device
+ createStorageGroupAndUpdateCache(result, devicePaths);
+ // third try to hit storage group in fast-fail way
+ getStorageGroupMap(result, devicePaths, true);
+ if (!result.isSuccess()) {
+ throw new StatementAnalyzeException("Failed to get Storage Group Map in three try.");
+ }
+ }
+ } catch (TException | MetadataException | IOException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getDeviceToStorageGroup():" + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * update storage group cache
+ *
+ * @param storageGroupNames the storage group names that need to update
+ */
+ public void updateStorageCache(Set<String> storageGroupNames) {
+ storageGroupCacheLock.writeLock().lock();
+ try {
+ storageGroupCache.addAll(storageGroupNames);
+ } finally {
+ storageGroupCacheLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * invalid storage group cache
+ *
+ * @param storageGroupNames the storage groups that need to invalid
+ */
+ public void removeFromStorageGroupCache(List<String> storageGroupNames) {
+ storageGroupCacheLock.writeLock().lock();
+ try {
+ for (String storageGroupName : storageGroupNames) {
+ storageGroupCache.remove(storageGroupName);
+ }
+ } finally {
+ storageGroupCacheLock.writeLock().unlock();
+ }
+ }
+
+ /** invalid all storage group cache */
+ public void removeFromStorageGroupCache() {
+ storageGroupCacheLock.writeLock().lock();
+ try {
+ storageGroupCache.clear();
+ } finally {
+ storageGroupCacheLock.writeLock().unlock();
+ }
+ }
+
+ // endregion
+
+ // region replicaSet cache
+ /**
+ * get regionReplicaSet from local and confignode
+ *
+ * @param consensusGroupId the id of consensus group
+ * @return regionReplicaSet
+ * @throws RuntimeException if failed to get regionReplicaSet from confignode
+ * @throws StatementAnalyzeException if there are exception when try to get latestRegionRouteMap
+ */
+ public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId consensusGroupId) {
+ if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
+ // try to update latestRegionRegionRouteMap when miss
+ synchronized (groupIdToReplicaSetMap) {
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TRegionRouteMapResp resp = client.getLatestRegionRouteMap();
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode()) {
+ updateGroupIdToReplicaSetMap(resp.getTimestamp(), resp.getRegionRouteMap());
+ }
+ if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
+ // failed to get RegionReplicaSet from confignode
+ throw new RuntimeException(
+ "Failed to get replicaSet of consensus group[id= " + consensusGroupId + "]");
+ }
+ } catch (IOException | TException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getRegionReplicaSet():" + e.getMessage());
+ }
+ }
+ }
+ // try to get regionReplicaSet by consensusGroupId
+ return groupIdToReplicaSetMap.get(consensusGroupId);
+ }
+
+ /**
+ * update regionReplicaSetMap according to timestamp
+ *
+ * @param timestamp the timestamp of map that need to update
+ * @param map consensusGroupId to regionReplicaSet map
+ * @return true if update successfully or false when map is not latest
+ */
+ public boolean updateGroupIdToReplicaSetMap(
+ long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> map) {
+ boolean result = (timestamp == latestUpdateTime.accumulateAndGet(timestamp, Math::max));
+ // if timestamp is greater than latestUpdateTime, then update
+ if (result) {
+ groupIdToReplicaSetMap.clear();
+ groupIdToReplicaSetMap.putAll(map);
+ }
+ return result;
+ }
+
+ /** invalid replicaSetCache */
+ public void invalidReplicaSetCache() {
+ groupIdToReplicaSetMap.clear();
+ }
+
+ // endregion
+
+ // region schema partition cache
+
+ /**
+ * get schemaPartition
+ *
+ * @param storageGroupToDeviceMap storage group to devices map
+ * @return SchemaPartition of storageGroupToDeviceMap
+ */
+ public SchemaPartition getSchemaPartition(Map<String, List<String>> storageGroupToDeviceMap) {
+ schemaPartitionCacheLock.readLock().lock();
+ try {
+ if (storageGroupToDeviceMap.size() == 0) {
+ CacheMetricsRecorder.record(false, SCHEMA_PARTITION_CACHE_NAME);
+ return null;
+ }
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
+ new HashMap<>();
+
+ // check cache for each storage group
+ for (Map.Entry<String, List<String>> entry : storageGroupToDeviceMap.entrySet()) {
+ String storageGroupName = entry.getKey();
+ Map<TSeriesPartitionSlot, TRegionReplicaSet> regionReplicaSetMap =
+ schemaPartitionMap.computeIfAbsent(storageGroupName, k -> new HashMap<>());
+ SchemaPartitionTable schemaPartitionTable =
+ schemaPartitionCache.getIfPresent(storageGroupName);
+ if (null == schemaPartitionTable) {
+ // if storage group not find, then return cache miss.
+ logger.debug(
+ "[{} Cache] miss when search storage group {}",
+ SCHEMA_PARTITION_CACHE_NAME,
+ storageGroupName);
+ CacheMetricsRecorder.record(false, SCHEMA_PARTITION_CACHE_NAME);
+ return null;
+ }
+ Map<TSeriesPartitionSlot, TConsensusGroupId> map =
+ schemaPartitionTable.getSchemaPartitionMap();
+ // check cache for each device
+ for (String device : entry.getValue()) {
+ TSeriesPartitionSlot seriesPartitionSlot =
+ partitionExecutor.getSeriesPartitionSlot(device);
+ if (!map.containsKey(seriesPartitionSlot)) {
+ // if one device not find, then return cache miss.
+ logger.debug(
+ "[{} Cache] miss when search device {}", SCHEMA_PARTITION_CACHE_NAME, device);
+ CacheMetricsRecorder.record(false, SCHEMA_PARTITION_CACHE_NAME);
+ return null;
+ }
+ TConsensusGroupId consensusGroupId = map.get(seriesPartitionSlot);
+ TRegionReplicaSet regionReplicaSet = getRegionReplicaSet(consensusGroupId);
+ regionReplicaSetMap.put(seriesPartitionSlot, regionReplicaSet);
+ }
+ }
+ logger.debug("[{} Cache] hit", SCHEMA_PARTITION_CACHE_NAME);
+ // cache hit
+ CacheMetricsRecorder.record(true, SCHEMA_PARTITION_CACHE_NAME);
+ return new SchemaPartition(
+ schemaPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
+ } finally {
+ schemaPartitionCacheLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * update schemaPartitionCache by schemaPartition.
+ *
+ * @param schemaPartitionTable storage group to SeriesPartitionSlot to ConsensusGroupId map
+ */
+ public void updateSchemaPartitionCache(
+ Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable) {
+ schemaPartitionCacheLock.writeLock().lock();
+ try {
+ for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> entry1 :
+ schemaPartitionTable.entrySet()) {
+ String storageGroupName = entry1.getKey();
+ SchemaPartitionTable result = schemaPartitionCache.getIfPresent(storageGroupName);
+ if (null == result) {
+ result = new SchemaPartitionTable();
+ schemaPartitionCache.put(storageGroupName, result);
+ }
+ Map<TSeriesPartitionSlot, TConsensusGroupId> seriesPartitionSlotTConsensusGroupIdMap =
+ result.getSchemaPartitionMap();
+ seriesPartitionSlotTConsensusGroupIdMap.putAll(entry1.getValue());
+ }
+ } finally {
+ schemaPartitionCacheLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * invalid schemaPartitionCache by storage group
+ *
+ * @param storageGroupName the storage groups that need to invalid
+ */
+ public void invalidSchemaPartitionCache(String storageGroupName) {
+ schemaPartitionCacheLock.writeLock().lock();
+ try {
+ schemaPartitionCache.invalidate(storageGroupName);
+ } finally {
+ schemaPartitionCacheLock.writeLock().unlock();
+ }
+ }
+
+ /** invalid all schemaPartitionCache */
+ public void invalidAllSchemaPartitionCache() {
+ schemaPartitionCacheLock.writeLock().lock();
+ try {
+ schemaPartitionCache.invalidateAll();
+ } finally {
+ schemaPartitionCacheLock.writeLock().unlock();
+ }
+ }
+ // endregion
+
+ // region data partition cache
+
+ /**
+ * get dataPartition by query param map
+ *
+ * @param storageGroupToQueryParamsMap storage group to dataPartitionQueryParam map
+ * @return DataPartition of storageGroupToQueryParamsMap
+ */
+ public DataPartition getDataPartition(
+ Map<String, List<DataPartitionQueryParam>> storageGroupToQueryParamsMap) {
+ dataPartitionCacheLock.readLock().lock();
+ try {
+ if (storageGroupToQueryParamsMap.size() == 0) {
+ CacheMetricsRecorder.record(false, DATA_PARTITION_CACHE_NAME);
+ return null;
+ }
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ // check cache for each storage group
+ for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+ storageGroupToQueryParamsMap.entrySet()) {
+ if (!getStorageGroupDataPartition(dataPartitionMap, entry.getKey(), entry.getValue())) {
+ CacheMetricsRecorder.record(false, DATA_PARTITION_CACHE_NAME);
+ return null;
+ }
+ }
+ logger.debug("[{} Cache] hit", DATA_PARTITION_CACHE_NAME);
+ // cache hit
+ CacheMetricsRecorder.record(true, DATA_PARTITION_CACHE_NAME);
+ return new DataPartition(dataPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
+ } finally {
+ dataPartitionCacheLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * get dataPartition from storage group
+ *
+ * @param dataPartitionMap result
+ * @param storageGroupName storage group that need to get
+ * @param dataPartitionQueryParams specific query params of data partition
+ * @return whether hit
+ */
+ private boolean getStorageGroupDataPartition(
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+ dataPartitionMap,
+ String storageGroupName,
+ List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ DataPartitionTable dataPartitionTable = dataPartitionCache.getIfPresent(storageGroupName);
+ if (null == dataPartitionTable) {
+ logger.debug(
+ "[{} Cache] miss when search storage group {}",
+ DATA_PARTITION_CACHE_NAME,
+ storageGroupName);
+ return false;
+ }
+ Map<TSeriesPartitionSlot, SeriesPartitionTable> cachedStorageGroupPartitionMap =
+ dataPartitionTable.getDataPartitionMap();
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+ seriesSlotToTimePartitionMap =
+ dataPartitionMap.computeIfAbsent(storageGroupName, k -> new HashMap<>());
+ // check cache for each device
+ for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+ if (!getDeviceDataPartition(
+ seriesSlotToTimePartitionMap, dataPartitionQueryParam, cachedStorageGroupPartitionMap)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * get dataPartition from device
+ *
+ * @param seriesSlotToTimePartitionMap result
+ * @param dataPartitionQueryParam specific query param of data partition
+ * @param cachedStorageGroupPartitionMap all cached data partition map of related storage group
+ * @return whether hit
+ */
+ private boolean getDeviceDataPartition(
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+ seriesSlotToTimePartitionMap,
+ DataPartitionQueryParam dataPartitionQueryParam,
+ Map<TSeriesPartitionSlot, SeriesPartitionTable> cachedStorageGroupPartitionMap) {
+ TSeriesPartitionSlot seriesPartitionSlot =
+ partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath());
+ SeriesPartitionTable cachedSeriesPartitionTable =
+ cachedStorageGroupPartitionMap.get(seriesPartitionSlot);
+ if (null == cachedSeriesPartitionTable) {
+ logger.debug(
+ "[{} Cache] miss when search device {}",
+ DATA_PARTITION_CACHE_NAME,
+ dataPartitionQueryParam.getDevicePath());
+ return false;
+ }
+ Map<TTimePartitionSlot, List<TConsensusGroupId>> cachedTimePartitionSlot =
+ cachedSeriesPartitionTable.getSeriesPartitionMap();
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotListMap =
+ seriesSlotToTimePartitionMap.computeIfAbsent(seriesPartitionSlot, k -> new HashMap<>());
+ // check cache for each time partition
+ for (TTimePartitionSlot timePartitionSlot :
+ dataPartitionQueryParam.getTimePartitionSlotList()) {
+ if (!getTimeSlotDataPartition(
+ timePartitionSlotListMap, timePartitionSlot, cachedTimePartitionSlot)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * get dataPartition from time slot
+ *
+ * @param timePartitionSlotListMap result
+ * @param timePartitionSlot the specific time partition slot of data partition
+ * @param cachedTimePartitionSlot all cached time slot map of related device
+ * @return whether hit
+ */
+ private boolean getTimeSlotDataPartition(
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotListMap,
+ TTimePartitionSlot timePartitionSlot,
+ Map<TTimePartitionSlot, List<TConsensusGroupId>> cachedTimePartitionSlot) {
+ List<TConsensusGroupId> cacheConsensusGroupId = cachedTimePartitionSlot.get(timePartitionSlot);
+ if (null == cacheConsensusGroupId || 0 == cacheConsensusGroupId.size()) {
+ logger.debug(
+ "[{} Cache] miss when search time partition {}",
+ DATA_PARTITION_CACHE_NAME,
+ timePartitionSlot);
+ return false;
+ }
+ List<TRegionReplicaSet> regionReplicaSets = new LinkedList<>();
+ for (TConsensusGroupId consensusGroupId : cacheConsensusGroupId) {
+ regionReplicaSets.add(getRegionReplicaSet(consensusGroupId));
+ }
+ timePartitionSlotListMap.put(timePartitionSlot, regionReplicaSets);
+ return true;
+ }
+
+ /**
+ * update dataPartitionCache by dataPartition
+ *
+ * @param dataPartitionTable storage group to seriesPartitionSlot to timePartitionSlot to
+ * ConsensusGroupId map
+ */
+ public void updateDataPartitionCache(
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
+ dataPartitionTable) {
+ dataPartitionCacheLock.writeLock().lock();
+ try {
+ for (Map.Entry<
+ String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
+ entry1 : dataPartitionTable.entrySet()) {
+ String storageGroupName = entry1.getKey();
+ DataPartitionTable result = dataPartitionCache.getIfPresent(storageGroupName);
+ if (null == result) {
+ result = new DataPartitionTable();
+ dataPartitionCache.put(storageGroupName, result);
+ }
+ Map<TSeriesPartitionSlot, SeriesPartitionTable> result2 = result.getDataPartitionMap();
+ for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
+ entry2 : entry1.getValue().entrySet()) {
+ TSeriesPartitionSlot seriesPartitionSlot = entry2.getKey();
+ SeriesPartitionTable seriesPartitionTable;
+ if (!result2.containsKey(seriesPartitionSlot)) {
+ // if device not exists, then add new seriesPartitionTable
+ seriesPartitionTable = new SeriesPartitionTable(entry2.getValue());
+ result2.put(seriesPartitionSlot, seriesPartitionTable);
+ } else {
+ // if device exists, then merge
+ seriesPartitionTable = result2.get(seriesPartitionSlot);
+ Map<TTimePartitionSlot, List<TConsensusGroupId>> result3 =
+ seriesPartitionTable.getSeriesPartitionMap();
+ result3.putAll(entry2.getValue());
+ }
+ }
+ }
+ } finally {
+ dataPartitionCacheLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * invalid dataPartitionCache by storageGroup
+ *
+ * @param storageGroup the storage groups that need to invalid
+ */
+ public void invalidDataPartitionCache(String storageGroup) {
+ dataPartitionCacheLock.writeLock().lock();
+ try {
+ dataPartitionCache.invalidate(storageGroup);
+ } finally {
+ dataPartitionCacheLock.writeLock().unlock();
+ }
+ }
+
+ /** invalid all dataPartitionCache */
+ public void invalidAllDataPartitionCache() {
+ dataPartitionCacheLock.writeLock().lock();
+ try {
+ dataPartitionCache.invalidateAll();
+ } finally {
+ dataPartitionCacheLock.writeLock().unlock();
+ }
+ }
+
+ // endregion
+
+ public void invalidAllCache() {
+ logger.debug("[Partition Cache] invalid");
+ removeFromStorageGroupCache();
+ invalidAllDataPartitionCache();
+ invalidAllSchemaPartitionCache();
+ invalidReplicaSetCache();
+ logger.debug("[Partition Cache] is invalid:{}", this);
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionCache{"
+ + "cacheSize="
+ + cacheSize
+ + ", storageGroupCache="
+ + storageGroupCache
+ + ", replicaSetCache="
+ + groupIdToReplicaSetMap
+ + ", schemaPartitionCache="
+ + schemaPartitionCache
+ + ", dataPartitionCache="
+ + dataPartitionCache
+ + '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/StorageGroupCacheResult.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/StorageGroupCacheResult.java
new file mode 100644
index 0000000000..29c811d1e0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/StorageGroupCacheResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.analyze.cache;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class StorageGroupCacheResult<V> {
+ /** the result */
+ private boolean success = true;
+ /** the list of devices that miss */
+ private List<String> missedDevices = new ArrayList<>();
+ /** result map, Notice: this map will be empty when failed */
+ protected Map<String, V> map = new HashMap<>();;
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public List<String> getMissedDevices() {
+ return missedDevices;
+ }
+
+ public void addMissedDevice(String missedDevice) {
+ this.missedDevices.add(missedDevice);
+ }
+
+ public Map<String, V> getMap() {
+ return map;
+ }
+
+ public abstract void put(String device, String storageGroupName);
+
+ /** set failed and clear the map */
+ public void setFailed() {
+ this.success = false;
+ this.map = new HashMap<>();
+ }
+
+ /** reset storageGroupCacheResult */
+ public void reset() {
+ this.success = true;
+ this.missedDevices = new ArrayList<>();
+ this.map = new HashMap<>();
+ }
+}
diff --git a/server/src/test/resources/datanode1conf/iotdb-datanode.properties b/server/src/test/resources/datanode1conf/iotdb-datanode.properties
index cd17728791..837faf2187 100644
--- a/server/src/test/resources/datanode1conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode1conf/iotdb-datanode.properties
@@ -26,7 +26,7 @@ internal_port=9003
data_region_consensus_port=40010
schema_region_consensus_port=50030
-target_config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
+target_config_nodes=127.0.0.1:22277,127.0.0.1:22279,127.0.0.1:22281
system_dir=target/datanode1/system
data_dirs=target/datanode1/data
diff --git a/server/src/test/resources/datanode2conf/iotdb-datanode.properties b/server/src/test/resources/datanode2conf/iotdb-datanode.properties
index 097e063007..cc5e6394ae 100644
--- a/server/src/test/resources/datanode2conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode2conf/iotdb-datanode.properties
@@ -26,7 +26,7 @@ internal_port=9005
data_region_consensus_port=40012
schema_region_consensus_port=50032
-target_config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
+target_config_nodes=127.0.0.1:22277,127.0.0.1:22279,127.0.0.1:22281
system_dir=target/datanode2/system
data_dirs=target/datanode2/data
diff --git a/server/src/test/resources/datanode3conf/iotdb-datanode.properties b/server/src/test/resources/datanode3conf/iotdb-datanode.properties
index 86be90b040..af5c60f450 100644
--- a/server/src/test/resources/datanode3conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode3conf/iotdb-datanode.properties
@@ -26,7 +26,7 @@ internal_port=9007
data_region_consensus_port=40014
schema_region_consensus_port=50034
-target_config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
+target_config_nodes=127.0.0.1:22277,127.0.0.1:22279,127.0.0.1:22281
system_dir=target/datanode3/system
data_dirs=target/datanode3/data