You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/27 05:30:13 UTC
[iotdb] branch master updated: [IOTDB-2960]Add partition cache (#5685)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4bf3dbb674 [IOTDB-2960]Add partition cache (#5685)
4bf3dbb674 is described below
commit 4bf3dbb6743ef68f921de03283890fd98d0c09a0
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Wed Apr 27 13:30:08 2022 +0800
[IOTDB-2960]Add partition cache (#5685)
---
.../resources/conf/iotdb-engine.properties | 5 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +
.../mpp/sql/analyze/ClusterPartitionFetcher.java | 446 +++++++++++++++++++--
.../mpp/sql/analyze/FakePartitionFetcherImpl.java | 11 +
.../db/mpp/sql/analyze/IPartitionFetcher.java | 4 +
.../sql/analyze/StandalonePartitionFetcher.java | 11 +
7 files changed, 455 insertions(+), 41 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index ac905a0f53..27e06fe926 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -1012,6 +1012,11 @@ timestamp_precision=ms
# Datatype: int
# datanode_schema_cache_size=10000
+# cache size for partition.
+# This cache is used to improve partition fetch from config node.
+# Datatype: int
+# partition_cache_size=10000
+
####################
### Schema File Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 527dd4e42b..edb9790918 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -896,6 +896,12 @@ public class IoTDBConfig {
*/
private int dataNodeSchemaCacheSize = 10000;
+ /**
+ * Cache size of partition cache in {@link
+ * org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher}
+ */
+ private int partitionCacheSize = 10000;
+
public float getUdfMemoryBudgetInMB() {
return udfMemoryBudgetInMB;
}
@@ -2829,4 +2835,12 @@ public class IoTDBConfig {
public void setDataNodeSchemaCacheSize(int dataNodeSchemaCacheSize) {
this.dataNodeSchemaCacheSize = dataNodeSchemaCacheSize;
}
+
+ public int getPartitionCacheSize() {
+ return partitionCacheSize;
+ }
+
+ public void setPartitionCacheSize(int partitionCacheSize) {
+ this.partitionCacheSize = partitionCacheSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index bbfd20b6c0..556c03b582 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1598,6 +1598,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"data_block_manager_keep_alive_time_in_ms",
Integer.toString(conf.getDataBlockManagerKeepAliveTimeInMs()))));
+
+ conf.setPartitionCacheSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "partition_cache_size", Integer.toString(conf.getPartitionCacheSize()))));
}
/** Get default encode algorithm by data type */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
index b2ecaa612c..49c121742c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
@@ -30,30 +30,48 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
public class ClusterPartitionFetcher implements IPartitionFetcher {
-
+ private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final ConfigNodeClient client;
private SeriesPartitionExecutor partitionExecutor;
+ private PartitionCache partitionCache;
+
private static final class ClusterPartitionFetcherHolder {
private static final ClusterPartitionFetcher INSTANCE = new ClusterPartitionFetcher();
@@ -70,59 +88,57 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
} catch (IoTDBConnectionException | BadNodeUrlException e) {
throw new StatementAnalyzeException("Couldn't connect config node");
}
- initPartitionExecutor();
- }
-
- private void initPartitionExecutor() {
- // TODO: @crz move to node-commons
- IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
- try {
- Class<?> executor = Class.forName(conf.getSeriesPartitionExecutorClass());
- Constructor<?> executorConstructor = executor.getConstructor(int.class);
- this.partitionExecutor =
- (SeriesPartitionExecutor)
- executorConstructor.newInstance(conf.getSeriesPartitionSlotNum());
- } catch (ClassNotFoundException
- | NoSuchMethodException
- | InstantiationException
- | IllegalAccessException
- | InvocationTargetException e) {
- throw new StatementAnalyzeException(
- String.format(
- "Couldn't Constructor SeriesPartitionExecutor class: %s",
- conf.getSeriesPartitionExecutorClass()));
- }
+ this.partitionExecutor =
+ SeriesPartitionExecutor.getSeriesPartitionExecutor(
+ config.getSeriesPartitionExecutorClass(), config.getSeriesPartitionSlotNum());
+ this.partitionCache =
+ new PartitionCache(
+ config.getSeriesPartitionExecutorClass(), config.getSeriesPartitionSlotNum());
}
@Override
public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
try {
- TSchemaPartitionResp schemaPartitionResp =
- client.getSchemaPartition(constructSchemaPartitionReq(patternTree));
- if (schemaPartitionResp.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return parseSchemaPartitionResp(schemaPartitionResp);
+ patternTree.constructTree();
+ List<String> devicePaths = patternTree.findAllDevicePaths();
+ Map<String, String> deviceToStorageGroupMap = getDeviceToStorageGroup(devicePaths);
+ SchemaPartition schemaPartition = partitionCache.getSchemaPartition(deviceToStorageGroupMap);
+ if (null == schemaPartition) {
+ TSchemaPartitionResp schemaPartitionResp =
+ client.getSchemaPartition(constructSchemaPartitionReq(patternTree));
+ if (schemaPartitionResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ schemaPartition = parseSchemaPartitionResp(schemaPartitionResp);
+ partitionCache.updateSchemaPartitionCache(devicePaths, schemaPartition);
+ }
}
+ return schemaPartition;
} catch (IoTDBConnectionException e) {
throw new StatementAnalyzeException("An error occurred when executing getSchemaPartition()");
}
- return null;
}
@Override
public SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree) {
try {
- TSchemaPartitionResp schemaPartitionResp =
- client.getOrCreateSchemaPartition(constructSchemaPartitionReq(patternTree));
- if (schemaPartitionResp.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return parseSchemaPartitionResp(schemaPartitionResp);
+ patternTree.constructTree();
+ List<String> devicePaths = patternTree.findAllDevicePaths();
+ Map<String, String> deviceToStorageGroupMap = getDeviceToStorageGroup(devicePaths);
+ SchemaPartition schemaPartition = partitionCache.getSchemaPartition(deviceToStorageGroupMap);
+ if (null == schemaPartition) {
+ TSchemaPartitionResp schemaPartitionResp =
+ client.getOrCreateSchemaPartition(constructSchemaPartitionReq(patternTree));
+ if (schemaPartitionResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ schemaPartition = parseSchemaPartitionResp(schemaPartitionResp);
+ partitionCache.updateSchemaPartitionCache(devicePaths, schemaPartition);
+ }
}
+ return schemaPartition;
} catch (IoTDBConnectionException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getOrCreateSchemaPartition()");
}
- return null;
}
@Override
@@ -140,6 +156,27 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
return null;
}
+ @Override
+ public DataPartition getDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ try {
+ Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
+ splitDataPartitionQueryParam(dataPartitionQueryParams);
+ DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
+ if (null == dataPartition) {
+ TDataPartitionResp dataPartitionResp =
+ client.getDataPartition(constructDataPartitionReq(splitDataPartitionQueryParams));
+ if (dataPartitionResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataPartition = parseDataPartitionResp(dataPartitionResp);
+ partitionCache.updateDataPartitionCache(dataPartition);
+ }
+ }
+ return dataPartition;
+ } catch (IoTDBConnectionException e) {
+ throw new StatementAnalyzeException("An error occurred when executing getDataPartition()");
+ }
+ }
+
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
@@ -156,6 +193,101 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
return null;
}
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ try {
+ Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
+ splitDataPartitionQueryParam(dataPartitionQueryParams);
+ DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
+ if (null == dataPartition) {
+ TDataPartitionResp dataPartitionResp =
+ client.getOrCreateDataPartition(
+ constructDataPartitionReq(splitDataPartitionQueryParams));
+ if (dataPartitionResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataPartition = parseDataPartitionResp(dataPartitionResp);
+ partitionCache.updateDataPartitionCache(dataPartition);
+ }
+ }
+ return dataPartition;
+ } catch (IoTDBConnectionException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getOrCreateDataPartition()");
+ }
+ }
+
+ /** get deviceToStorageGroup map */
+ private Map<String, String> getDeviceToStorageGroup(List<String> devicePaths) {
+ Map<String, String> deviceToStorageGroup = new HashMap<>();
+ // first try to hit cache
+ if (!partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup)) {
+ List<String> storageGroupPathPattern = Arrays.asList("root", "**");
+ try {
+ TStorageGroupSchemaResp storageGroupSchemaResp =
+ client.getMatchedStorageGroupSchemas(storageGroupPathPattern);
+ if (storageGroupSchemaResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ Set<String> storageGroupNames =
+ storageGroupSchemaResp.getStorageGroupSchemaMap().keySet();
+ // update all storage group into cache
+ partitionCache.updateStorageCache(storageGroupNames);
+ // second try to hit cache
+ deviceToStorageGroup = new HashMap<>();
+ if (!partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup)) {
+ // try to auto create storage group
+ Set<String> storageGroupNamesNeedCreated = new HashSet<>();
+ for (String devicePath : devicePaths) {
+ if (!deviceToStorageGroup.containsKey(devicePath)) {
+ PartialPath storageGroupNameNeedCreated =
+ MetaUtils.getStorageGroupPathByLevel(
+ new PartialPath(devicePath), config.getDefaultStorageGroupLevel());
+ storageGroupNamesNeedCreated.add(storageGroupNameNeedCreated.getFullPath());
+ }
+ }
+ for (String storageGroupName : storageGroupNamesNeedCreated) {
+ TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
+ storageGroupSchema.setName(storageGroupName);
+ TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
+ client.setStorageGroup(req);
+ }
+ partitionCache.updateStorageCache(storageGroupNamesNeedCreated);
+ // third try to hit cache
+ deviceToStorageGroup = new HashMap<>();
+ if (!partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup)) {
+ throw new StatementAnalyzeException(
+ "Failed to get Storage Group Map when executing getOrCreateDataPartition()");
+ }
+ }
+ }
+ } catch (IoTDBConnectionException | MetadataException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getOrCreateDataPartition()");
+ }
+ }
+ return deviceToStorageGroup;
+ }
+
+ /** split data partition query param by storage group */
+ private Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParam(
+ List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ List<String> devicePaths = new ArrayList<>();
+ for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+ devicePaths.add(dataPartitionQueryParam.getDevicePath());
+ }
+ Map<String, String> deviceToStorageGroup = getDeviceToStorageGroup(devicePaths);
+
+ Map<String, List<DataPartitionQueryParam>> result = new HashMap<>();
+ for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+ String storageGroup = deviceToStorageGroup.get(dataPartitionQueryParam.getDevicePath());
+ if (!result.containsKey(storageGroup)) {
+ result.put(storageGroup, new ArrayList<>());
+ }
+ result.get(storageGroup).add(dataPartitionQueryParam);
+ }
+ return result;
+ }
+
private TSchemaPartitionReq constructSchemaPartitionReq(PathPatternTree patternTree) {
PublicBAOS baos = new PublicBAOS();
try {
@@ -208,8 +340,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
}
return new SchemaPartition(
schemaPartitionMap,
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ config.getSeriesPartitionExecutorClass(),
+ config.getSeriesPartitionSlotNum());
}
private DataPartition parseDataPartitionResp(TDataPartitionResp dataPartitionResp) {
@@ -245,7 +377,239 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
}
return new DataPartition(
dataPartitionMap,
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ config.getSeriesPartitionExecutorClass(),
+ config.getSeriesPartitionSlotNum());
+ }
+
+ private class PartitionCache {
+ /** the size of partitionCache */
+ private final int cacheSize = config.getPartitionCacheSize();
+ /** the cache of storage group */
+ private Set<String> storageGroupCache = Collections.synchronizedSet(new HashSet<>());
+ /** device -> tRegionReplicaSet */
+ private final Cache<String, TRegionReplicaSet> schemaPartitionCache;
+ /** tSeriesPartitionSlot, tTimesereisPartitionSlot -> TRegionReplicaSets * */
+ private final Cache<DataPartitionCacheKey, List<TRegionReplicaSet>> dataPartitionCache;
+ /** calculate slotId by device */
+ private final String seriesSlotExecutorName;
+
+ private final int seriesPartitionSlotNum;
+
+ public PartitionCache(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
+ this.seriesSlotExecutorName = seriesSlotExecutorName;
+ this.seriesPartitionSlotNum = seriesPartitionSlotNum;
+ this.schemaPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
+ this.dataPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
+ }
+
+ /** get storage group by cache */
+ public boolean getStorageGroup(
+ List<String> devicePaths, Map<String, String> deviceToStorageGroupMap) {
+ if (storageGroupCache.size() == 0) {
+ logger.debug("Failed to get storage group");
+ return false;
+ }
+ for (String devicePath : devicePaths) {
+ for (String storageGroup : storageGroupCache) {
+ if (devicePath.startsWith(storageGroup)) {
+ deviceToStorageGroupMap.put(devicePath, storageGroup);
+ } else {
+ logger.debug("Failed to get storage group cache");
+ return false;
+ }
+ }
+ }
+ logger.debug("Hit storage group");
+ return true;
+ }
+
+ /** update the cache of storage group */
+ public void updateStorageCache(Set<String> storageGroupNames) {
+ for (String storageGroupName : storageGroupNames) {
+ if (!storageGroupCache.contains(storageGroupName)) {
+ storageGroupCache.add(storageGroupName);
+ }
+ }
+ }
+
+ /** invalid storage group after delete */
+ public void invalidStorageGroupCache(List<String> storageGroupNames) {
+ for (String storageGroupName : storageGroupNames) {
+ if (storageGroupCache.contains(storageGroupName)) {
+ storageGroupCache.remove(storageGroupName);
+ }
+ }
+ }
+
+ /** get schemaPartition by patternTree */
+ public SchemaPartition getSchemaPartition(Map<String, String> deviceToStorageGroupMap) {
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
+ new HashMap<>();
+ // check cache for each device
+ for (Map.Entry<String, String> entry : deviceToStorageGroupMap.entrySet()) {
+ String device = entry.getKey();
+ TSeriesPartitionSlot seriesPartitionSlot = partitionExecutor.getSeriesPartitionSlot(device);
+ TRegionReplicaSet regionReplicaSet = schemaPartitionCache.getIfPresent(device);
+ if (null == regionReplicaSet) {
+ // if one device not find, then return cache miss.
+ logger.debug("Failed to find schema partition");
+ return null;
+ }
+ String storageGroupName = deviceToStorageGroupMap.get(device);
+ if (!schemaPartitionMap.containsKey(storageGroupName)) {
+ schemaPartitionMap.put(storageGroupName, new HashMap<>());
+ }
+ Map<TSeriesPartitionSlot, TRegionReplicaSet> regionReplicaSetMap =
+ schemaPartitionMap.get(storageGroupName);
+ regionReplicaSetMap.put(seriesPartitionSlot, regionReplicaSet);
+ }
+ logger.debug("Hit schema partition");
+ // cache hit
+ return new SchemaPartition(
+ schemaPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
+ }
+
+ /** get dataPartition by query param map */
+ public DataPartition getDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ // check cache for each storage group
+ for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+ sgNameToQueryParamsMap.entrySet()) {
+ String storageGroupName = entry.getKey();
+ if (!dataPartitionMap.containsKey(storageGroupName)) {
+ dataPartitionMap.put(storageGroupName, new HashMap<>());
+ }
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+ seriesSlotToTimePartitionMap = dataPartitionMap.get(storageGroupName);
+ // check cache for each query param
+ for (DataPartitionQueryParam dataPartitionQueryParam : entry.getValue()) {
+ if (null == dataPartitionQueryParam.getTimePartitionSlotList()
+ || 0 == dataPartitionQueryParam.getTimePartitionSlotList().size()) {
+ // if query all data, cache miss
+ logger.debug("Failed to find data partition");
+ return null;
+ }
+ TSeriesPartitionSlot seriesPartitionSlot =
+ partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath());
+ if (!seriesSlotToTimePartitionMap.containsKey(seriesPartitionSlot)) {
+ seriesSlotToTimePartitionMap.put(seriesPartitionSlot, new HashMap<>());
+ }
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotListMap =
+ seriesSlotToTimePartitionMap.get(seriesPartitionSlot);
+ // check cache for each time partition
+ for (TTimePartitionSlot timePartitionSlot :
+ dataPartitionQueryParam.getTimePartitionSlotList()) {
+ DataPartitionCacheKey dataPartitionCacheKey =
+ new DataPartitionCacheKey(seriesPartitionSlot, timePartitionSlot);
+ List<TRegionReplicaSet> regionReplicaSets =
+ dataPartitionCache.getIfPresent(dataPartitionCacheKey);
+ if (null == regionReplicaSets) {
+ // if one time partition not find, cache miss
+ logger.debug("Failed to find data partition");
+ return null;
+ }
+ timePartitionSlotListMap.put(timePartitionSlot, regionReplicaSets);
+ }
+ }
+ }
+ logger.debug("Hit data partition");
+ // cache hit
+ return new DataPartition(dataPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
+ }
+
+ /** update schemaPartitionCache by schemaPartition. */
+ public void updateSchemaPartitionCache(List<String> devices, SchemaPartition schemaPartition) {
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> storageGroupPartitionMap =
+ schemaPartition.getSchemaPartitionMap();
+ Set<String> storageGroupNames = storageGroupPartitionMap.keySet();
+ for (String device : devices) {
+ String storageGroup = null;
+ for (String storageGroupName : storageGroupNames) {
+ if (device.startsWith(storageGroupName)) {
+ storageGroup = storageGroupName;
+ break;
+ }
+ }
+ if (null == storageGroup) {
+ logger.error(
+ "Failed to get the storage group of {} when update SchemaPartitionCache", device);
+ continue;
+ }
+ TSeriesPartitionSlot seriesPartitionSlot = partitionExecutor.getSeriesPartitionSlot(device);
+ TRegionReplicaSet regionReplicaSet =
+ storageGroupPartitionMap.get(storageGroup).getOrDefault(seriesPartitionSlot, null);
+ if (null == regionReplicaSet) {
+ logger.error(
+ "Failed to get the regionReplicaSet of {} when update SchemaPartitionCache", device);
+ continue;
+ }
+ schemaPartitionCache.put(device, regionReplicaSet);
+ }
+ }
+
+ /** update dataPartitionCache by dataPartition */
+ public void updateDataPartitionCache(DataPartition dataPartition) {
+ for (Map.Entry<
+ String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+ entry1 : dataPartition.getDataPartitionMap().entrySet()) {
+ String storageGroup = entry1.getKey();
+ for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+ entry2 : entry1.getValue().entrySet()) {
+ TSeriesPartitionSlot seriesPartitionSlot = entry2.getKey();
+ for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> entry3 :
+ entry2.getValue().entrySet()) {
+ DataPartitionCacheKey dataPartitionCacheKey =
+ new DataPartitionCacheKey(seriesPartitionSlot, entry3.getKey());
+ dataPartitionCache.put(dataPartitionCacheKey, entry3.getValue());
+ }
+ }
+ }
+ }
+
+ /** invalid schemaPartitionCache by device */
+ public void invalidSchemaPartitionCache(String device) {
+ // TODO should be called in two situation: 1. redirect status 2. config node trigger
+ schemaPartitionCache.invalidate(device);
+ }
+
+ /** invalid dataPartitionCache by seriesPartitionSlot, timePartitionSlot */
+ public void invalidDataPartitionCache(
+ TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) {
+ // TODO should be called in two situation: 1. redirect status 2. config node trigger
+ DataPartitionCacheKey dataPartitionCacheKey =
+ new DataPartitionCacheKey(seriesPartitionSlot, timePartitionSlot);
+ dataPartitionCache.invalidate(dataPartitionCacheKey);
+ }
+ }
+
+ private class DataPartitionCacheKey {
+ private TSeriesPartitionSlot seriesPartitionSlot;
+ private TTimePartitionSlot timePartitionSlot;
+
+ public DataPartitionCacheKey(
+ TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) {
+ this.seriesPartitionSlot = seriesPartitionSlot;
+ this.timePartitionSlot = timePartitionSlot;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DataPartitionCacheKey that = (DataPartitionCacheKey) o;
+ return Objects.equals(seriesPartitionSlot, that.seriesPartitionSlot)
+ && Objects.equals(timePartitionSlot, that.timePartitionSlot);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(seriesPartitionSlot, timePartitionSlot);
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
index eb0d8bec34..a60acbb810 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
@@ -139,9 +139,20 @@ public class FakePartitionFetcherImpl implements IPartitionFetcher {
return dataPartition;
}
+ @Override
+ public DataPartition getDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ return null;
+ }
+
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
return null;
}
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
index aa00f68f0b..c49d3ea28c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
@@ -34,6 +34,10 @@ public interface IPartitionFetcher {
DataPartition getDataPartition(Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap);
+ DataPartition getDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams);
+
DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap);
+
+ DataPartition getOrCreateDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
index ceacc51b6c..9cc05807b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
@@ -108,9 +108,20 @@ public class StandalonePartitionFetcher implements IPartitionFetcher {
}
}
+ @Override
+ public DataPartition getDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ return null;
+ }
+
@Override
public DataPartition getOrCreateDataPartition(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
return getDataPartition(sgNameToQueryParamsMap);
}
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ return null;
+ }
}