You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/12 09:02:25 UTC

[iotdb] branch master updated: [IOTDB-3352][IOTDB-3782] Fix the cache problem in PartitionCache and Optimize. (#6624)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 203b15bae6 [IOTDB-3352][IOTDB-3782] Fix the cache problem in PartitionCache and Optimize. (#6624)
203b15bae6 is described below

commit 203b15bae6a007416f5ad8bd40f64571b238a5d4
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Tue Jul 12 17:02:18 2022 +0800

    [IOTDB-3352][IOTDB-3782] Fix the cache problem in PartitionCache and Optimize. (#6624)
---
 .../resources/conf/iotdb-datanode.properties       |   2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  | 610 +++-------------
 .../db/mpp/plan/analyze/cache/PartitionCache.java  | 786 +++++++++++++++++++++
 .../analyze/cache/StorageGroupCacheResult.java     |  68 ++
 .../datanode1conf/iotdb-datanode.properties        |   2 +-
 .../datanode2conf/iotdb-datanode.properties        |   2 +-
 .../datanode3conf/iotdb-datanode.properties        |   2 +-
 8 files changed, 971 insertions(+), 503 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 65355f51ad..0bbdb3c64e 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -1044,7 +1044,7 @@ timestamp_precision=ms
 # cache size for partition.
 # This cache is used to improve partition fetch from config node.
 # Datatype: int
-# partition_cache_size=0
+# partition_cache_size=1000
 
 ####################
 ### Schema File Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a2c8ce8faa..697d87469b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -905,7 +905,7 @@ public class IoTDBConfig {
    * Cache size of partition cache in {@link
    * org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher}
    */
-  private int partitionCacheSize = 0;
+  private int partitionCacheSize = 1000;
 
   /** Cache size of user and role */
   private int authorCacheSize = 100;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 1e1e5c00f0..6d69c095bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -20,44 +20,34 @@ package org.apache.iotdb.db.mpp.plan.analyze;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
-import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
-import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.service.metrics.recorder.CacheMetricsRecorder;
+import org.apache.iotdb.db.mpp.plan.analyze.cache.PartitionCache;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,23 +55,15 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 public class ClusterPartitionFetcher implements IPartitionFetcher {
   private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private static final List<String> ROOT_PATH = Arrays.asList("root", "**");
 
   private final SeriesPartitionExecutor partitionExecutor;
 
@@ -105,9 +87,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     this.partitionExecutor =
         SeriesPartitionExecutor.getSeriesPartitionExecutor(
             config.getSeriesPartitionExecutorClass(), config.getSeriesPartitionSlotNum());
-    this.partitionCache =
-        new PartitionCache(
-            config.getSeriesPartitionExecutorClass(), config.getSeriesPartitionSlotNum());
+    this.partitionCache = new PartitionCache();
   }
 
   @Override
@@ -116,15 +96,22 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
         configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
       patternTree.constructTree();
       List<String> devicePaths = patternTree.getAllDevicePatterns();
-      Map<String, String> deviceToStorageGroupMap = getDeviceToStorageGroup(devicePaths, false);
-      SchemaPartition schemaPartition = partitionCache.getSchemaPartition(deviceToStorageGroupMap);
+      Map<String, List<String>> storageGroupToDeviceMap =
+          partitionCache.getStorageGroupToDevice(devicePaths, false);
+      SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap);
       if (null == schemaPartition) {
-        TSchemaPartitionResp schemaPartitionResp =
-            client.getSchemaPartition(constructSchemaPartitionReq(patternTree));
-        if (schemaPartitionResp.getStatus().getCode()
+        TSchemaPartitionTableResp schemaPartitionTableResp =
+            client.getSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
+        if (schemaPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          schemaPartition = parseSchemaPartitionResp(schemaPartitionResp);
-          partitionCache.updateSchemaPartitionCache(devicePaths, schemaPartition);
+          schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp);
+          partitionCache.updateSchemaPartitionCache(
+              schemaPartitionTableResp.getSchemaPartitionTable());
+        } else {
+          throw new RuntimeException(
+              new IoTDBException(
+                  schemaPartitionTableResp.getStatus().getMessage(),
+                  schemaPartitionTableResp.getStatus().getCode()));
         }
       }
       return schemaPartition;
@@ -140,20 +127,22 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
         configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
       patternTree.constructTree();
       List<String> devicePaths = patternTree.getAllDevicePatterns();
-      Map<String, String> deviceToStorageGroupMap = getDeviceToStorageGroup(devicePaths, true);
-      SchemaPartition schemaPartition = partitionCache.getSchemaPartition(deviceToStorageGroupMap);
+      Map<String, List<String>> storageGroupToDeviceMap =
+          partitionCache.getStorageGroupToDevice(devicePaths, true);
+      SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap);
       if (null == schemaPartition) {
-        TSchemaPartitionResp schemaPartitionResp =
-            client.getOrCreateSchemaPartition(constructSchemaPartitionReq(patternTree));
-        if (schemaPartitionResp.getStatus().getCode()
+        TSchemaPartitionTableResp schemaPartitionTableResp =
+            client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
+        if (schemaPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          schemaPartition = parseSchemaPartitionResp(schemaPartitionResp);
-          partitionCache.updateSchemaPartitionCache(devicePaths, schemaPartition);
+          schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp);
+          partitionCache.updateSchemaPartitionCache(
+              schemaPartitionTableResp.getSchemaPartitionTable());
         } else {
           throw new RuntimeException(
               new IoTDBException(
-                  schemaPartitionResp.getStatus().getMessage(),
-                  schemaPartitionResp.getStatus().getCode()));
+                  schemaPartitionTableResp.getStatus().getMessage(),
+                  schemaPartitionTableResp.getStatus().getCode()));
         }
       }
       return schemaPartition;
@@ -185,16 +174,21 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
     try (ConfigNodeClient client =
         configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-      TDataPartitionResp dataPartitionResp =
-          client.getDataPartition(constructDataPartitionReq(sgNameToQueryParamsMap));
-      if (dataPartitionResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        return parseDataPartitionResp(dataPartitionResp);
+      TDataPartitionTableResp dataPartitionTableResp =
+          client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+
+      if (dataPartitionTableResp.getStatus().getCode()
+          == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return parseDataPartitionResp(dataPartitionTableResp);
+      } else {
+        throw new StatementAnalyzeException(
+            "An error occurred when executing getDataPartition():"
+                + dataPartitionTableResp.getStatus().getMessage());
       }
     } catch (TException | IOException e) {
       throw new StatementAnalyzeException(
           "An error occurred when executing getDataPartition():" + e.getMessage());
     }
-    return null;
   }
 
   @Override
@@ -205,12 +199,18 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
           splitDataPartitionQueryParam(dataPartitionQueryParams, false);
       DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
       if (null == dataPartition) {
-        TDataPartitionResp dataPartitionResp =
-            client.getDataPartition(constructDataPartitionReq(splitDataPartitionQueryParams));
-        if (dataPartitionResp.getStatus().getCode()
+        TDataPartitionTableResp dataPartitionTableResp =
+            client.getDataPartitionTable(constructDataPartitionReq(splitDataPartitionQueryParams));
+
+        if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          dataPartition = parseDataPartitionResp(dataPartitionResp);
-          partitionCache.updateDataPartitionCache(dataPartition);
+          dataPartition = parseDataPartitionResp(dataPartitionTableResp);
+          partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
+        } else {
+          throw new RuntimeException(
+              new IoTDBException(
+                  dataPartitionTableResp.getStatus().getMessage(),
+                  dataPartitionTableResp.getStatus().getCode()));
         }
       }
       return dataPartition;
@@ -226,14 +226,15 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     // Do not use data partition cache
     try (ConfigNodeClient client =
         configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-      TDataPartitionResp dataPartitionResp =
-          client.getOrCreateDataPartition(constructDataPartitionReq(sgNameToQueryParamsMap));
-      if (dataPartitionResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        return parseDataPartitionResp(dataPartitionResp);
+      TDataPartitionTableResp dataPartitionTableResp =
+          client.getOrCreateDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+      if (dataPartitionTableResp.getStatus().getCode()
+          == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return parseDataPartitionResp(dataPartitionTableResp);
       } else {
         throw new StatementAnalyzeException(
             "An error occurred when executing getOrCreateDataPartition():"
-                + dataPartitionResp.getStatus().getMessage());
+                + dataPartitionTableResp.getStatus().getMessage());
       }
     } catch (TException | IOException e) {
       throw new StatementAnalyzeException(
@@ -250,13 +251,19 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
           splitDataPartitionQueryParam(dataPartitionQueryParams, true);
       DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
       if (null == dataPartition) {
-        TDataPartitionResp dataPartitionResp =
-            client.getOrCreateDataPartition(
+        TDataPartitionTableResp dataPartitionTableResp =
+            client.getOrCreateDataPartitionTable(
                 constructDataPartitionReq(splitDataPartitionQueryParams));
-        if (dataPartitionResp.getStatus().getCode()
+
+        if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          dataPartition = parseDataPartitionResp(dataPartitionResp);
-          partitionCache.updateDataPartitionCache(dataPartition);
+          dataPartition = parseDataPartitionResp(dataPartitionTableResp);
+          partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
+        } else {
+          throw new RuntimeException(
+              new IoTDBException(
+                  dataPartitionTableResp.getStatus().getMessage(),
+                  dataPartitionTableResp.getStatus().getCode()));
         }
       }
       return dataPartition;
@@ -268,86 +275,12 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
 
   @Override
   public boolean updateRegionCache(TRegionRouteReq req) {
-    return partitionCache.updateGroupIdToReplicaSetMap(req);
+    return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(), req.getRegionRouteMap());
   }
 
   @Override
   public void invalidAllCache() {
-    logger.debug("Invalidate partition cache");
-    partitionCache.storageGroupCache.clear();
-    partitionCache.invalidAllDataPartitionCache();
-    partitionCache.invalidAllSchemaPartitionCache();
-    logger.debug("PartitionCache is invalid:{}", partitionCache);
-  }
-
-  /** get deviceToStorageGroup map */
-  private Map<String, String> getDeviceToStorageGroup(
-      List<String> devicePaths, boolean isAutoCreate) {
-    Map<String, String> deviceToStorageGroup = new HashMap<>();
-    // miss when devicePath contains *
-    for (String devicePath : devicePaths) {
-      if (devicePath.contains("*")) {
-        return deviceToStorageGroup;
-      }
-    }
-    // first try to hit cache
-    boolean firstTryResult = partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup);
-    if (!firstTryResult) {
-      try (ConfigNodeClient client =
-          configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-        TStorageGroupSchemaResp storageGroupSchemaResp =
-            client.getMatchedStorageGroupSchemas(ROOT_PATH);
-        if (storageGroupSchemaResp.getStatus().getCode()
-            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          Set<String> storageGroupNames =
-              storageGroupSchemaResp.getStorageGroupSchemaMap().keySet();
-          // update all storage group into cache
-          partitionCache.updateStorageCache(storageGroupNames);
-          // second try to hit cache
-          deviceToStorageGroup = new HashMap<>();
-          boolean secondTryResult =
-              partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup);
-          if (!secondTryResult && isAutoCreate) {
-            // try to auto create storage group
-            Set<String> storageGroupNamesNeedCreated = new HashSet<>();
-            for (String devicePath : devicePaths) {
-              if (!deviceToStorageGroup.containsKey(devicePath)) {
-                PartialPath storageGroupNameNeedCreated =
-                    MetaUtils.getStorageGroupPathByLevel(
-                        new PartialPath(devicePath), config.getDefaultStorageGroupLevel());
-                storageGroupNamesNeedCreated.add(storageGroupNameNeedCreated.getFullPath());
-              }
-            }
-            Set<String> successFullyCreatedStorageGroup = new HashSet<>();
-            for (String storageGroupName : storageGroupNamesNeedCreated) {
-              TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
-              storageGroupSchema.setName(storageGroupName);
-              TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
-              TSStatus tsStatus = client.setStorageGroup(req);
-              if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()) {
-                successFullyCreatedStorageGroup.add(storageGroupName);
-              } else {
-                partitionCache.updateStorageCache(successFullyCreatedStorageGroup);
-                throw new RuntimeException(new IoTDBException(tsStatus.message, tsStatus.code));
-              }
-            }
-            partitionCache.updateStorageCache(storageGroupNamesNeedCreated);
-            // third try to hit cache
-            deviceToStorageGroup = new HashMap<>();
-            boolean thirdTryResult =
-                partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup);
-            if (!thirdTryResult) {
-              throw new StatementAnalyzeException(
-                  "Failed to get Storage Group Map when executing getOrCreateDataPartition()");
-            }
-          }
-        }
-      } catch (TException | MetadataException | IOException e) {
-        throw new StatementAnalyzeException(
-            "An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
-      }
-    }
-    return deviceToStorageGroup;
+    partitionCache.invalidAllCache();
   }
 
   /** split data partition query param by storage group */
@@ -357,13 +290,13 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
       devicePaths.add(dataPartitionQueryParam.getDevicePath());
     }
-    Map<String, String> deviceToStorageGroup = getDeviceToStorageGroup(devicePaths, isAutoCreate);
-
+    Map<String, String> deviceToStorageGroupMap =
+        partitionCache.getDeviceToStorageGroup(devicePaths, isAutoCreate);
     Map<String, List<DataPartitionQueryParam>> result = new HashMap<>();
     for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
       String devicePath = dataPartitionQueryParam.getDevicePath();
-      if (deviceToStorageGroup.containsKey(devicePath)) {
-        String storageGroup = deviceToStorageGroup.get(devicePath);
+      if (deviceToStorageGroupMap.containsKey(devicePath)) {
+        String storageGroup = deviceToStorageGroupMap.get(devicePath);
         if (!result.containsKey(storageGroup)) {
           result.put(storageGroup, new ArrayList<>());
         }
@@ -432,9 +365,23 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     return new TDataPartitionReq(partitionSlotsMap);
   }
 
-  private SchemaPartition parseSchemaPartitionResp(TSchemaPartitionResp schemaPartitionResp) {
+  private SchemaPartition parseSchemaPartitionTableResp(
+      TSchemaPartitionTableResp schemaPartitionTableResp) {
+    Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> regionReplicaMap = new HashMap<>();
+    for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> entry1 :
+        schemaPartitionTableResp.getSchemaPartitionTable().entrySet()) {
+      Map<TSeriesPartitionSlot, TRegionReplicaSet> result1 =
+          regionReplicaMap.computeIfAbsent(entry1.getKey(), k -> new HashMap<>());
+      for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> entry2 :
+          entry1.getValue().entrySet()) {
+        TSeriesPartitionSlot seriesPartitionSlot = entry2.getKey();
+        TConsensusGroupId consensusGroupId = entry2.getValue();
+        result1.put(seriesPartitionSlot, partitionCache.getRegionReplicaSet(consensusGroupId));
+      }
+    }
+
     return new SchemaPartition(
-        schemaPartitionResp.getSchemaRegionMap(),
+        regionReplicaMap,
         IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
         IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
   }
@@ -448,365 +395,32 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
         schemaNodeManagementResp.getMatchedNode());
   }
 
-  private DataPartition parseDataPartitionResp(TDataPartitionResp dataPartitionResp) {
-    return new DataPartition(
-        dataPartitionResp.getDataPartitionMap(),
-        config.getSeriesPartitionExecutorClass(),
-        config.getSeriesPartitionSlotNum());
-  }
-
-  private class PartitionCache {
-    /** the size of partitionCache */
-    private final int cacheSize = config.getPartitionCacheSize();
-    /** the cache of storage group */
-    private final Set<String> storageGroupCache = Collections.synchronizedSet(new HashSet<>());
-    /** the lock of storage group cache */
-    private final ReentrantReadWriteLock storageGroupCacheLock = new ReentrantReadWriteLock();
-    /** device -> regionReplicaSet */
-    private final Cache<String, TRegionReplicaSet> schemaPartitionCache;
-    /** the lock of schemaPartition cache * */
-    private final ReentrantReadWriteLock schemaPartitionCacheLock = new ReentrantReadWriteLock();
-    /** seriesPartitionSlot, timesereisPartitionSlot -> regionReplicaSets * */
-    private final Cache<DataPartitionCacheKey, List<TRegionReplicaSet>> dataPartitionCache;
-    /** the lock of dataPartition cache */
-    private final ReentrantReadWriteLock dataPartitionCacheLock = new ReentrantReadWriteLock();
-    /** calculate slotId by device */
-    private final String seriesSlotExecutorName;
-
-    private final int seriesPartitionSlotNum;
-
-    /** the latest time when groupIdToReplicaSetMap updated. */
-    private final AtomicLong latestUpdateTime = new AtomicLong(0);
-    /** TConsensusGroupId -> TRegionReplicaSet */
-    private final Map<TConsensusGroupId, TRegionReplicaSet> groupIdToReplicaSetMap =
-        new ConcurrentHashMap<>();
-
-    public PartitionCache(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
-      this.seriesSlotExecutorName = seriesSlotExecutorName;
-      this.seriesPartitionSlotNum = seriesPartitionSlotNum;
-      this.schemaPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
-      this.dataPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
-    }
-
-    /** get storage group by cache */
-    public boolean getStorageGroup(
-        List<String> devicePaths, Map<String, String> deviceToStorageGroupMap) {
-      storageGroupCacheLock.readLock().lock();
-      try {
-        boolean result = true;
-        if (storageGroupCache.size() == 0) {
-          logger.debug("Failed to get storage group");
-          result = false;
-        } else {
-          for (String devicePath : devicePaths) {
-            boolean hit = false;
-            synchronized (storageGroupCache) {
-              for (String storageGroup : storageGroupCache) {
-                if (PathUtils.isStartWith(devicePath, storageGroup)) {
-                  deviceToStorageGroupMap.put(devicePath, storageGroup);
-                  hit = true;
-                  break;
-                }
-              }
-            }
-            if (!hit) {
-              logger.debug("{} cannot hit storage group cache", devicePath);
-              result = false;
-              break;
-            }
+  private DataPartition parseDataPartitionResp(TDataPartitionTableResp dataPartitionTableResp) {
+    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+        regionReplicaSet = new HashMap<>();
+    for (Map.Entry<
+            String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
+        entry1 : dataPartitionTableResp.getDataPartitionTable().entrySet()) {
+      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> result1 =
+          regionReplicaSet.computeIfAbsent(entry1.getKey(), k -> new HashMap<>());
+      for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
+          entry2 : entry1.getValue().entrySet()) {
+        Map<TTimePartitionSlot, List<TRegionReplicaSet>> result2 =
+            result1.computeIfAbsent(entry2.getKey(), k -> new HashMap<>());
+        for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> entry3 :
+            entry2.getValue().entrySet()) {
+          List<TRegionReplicaSet> regionReplicaSets = new LinkedList<>();
+          for (TConsensusGroupId consensusGroupId : entry3.getValue()) {
+            regionReplicaSets.add(partitionCache.getRegionReplicaSet(consensusGroupId));
           }
+          result2.put(entry3.getKey(), regionReplicaSets);
         }
-        CacheMetricsRecorder.record(result, "StorageGroup");
-        return result;
-      } finally {
-        storageGroupCacheLock.readLock().unlock();
       }
     }
 
-    /** update the cache of storage group */
-    public void updateStorageCache(Set<String> storageGroupNames) {
-      storageGroupCacheLock.writeLock().lock();
-      try {
-        storageGroupCache.addAll(storageGroupNames);
-      } finally {
-        storageGroupCacheLock.writeLock().unlock();
-      }
-    }
-
-    /** invalid storage group after delete */
-    public void invalidStorageGroupCache(List<String> storageGroupNames) {
-      storageGroupCacheLock.writeLock().lock();
-      try {
-        for (String storageGroupName : storageGroupNames) {
-          storageGroupCache.remove(storageGroupName);
-        }
-      } finally {
-        storageGroupCacheLock.writeLock().unlock();
-      }
-    }
-
-    /** get schemaPartition by patternTree */
-    public SchemaPartition getSchemaPartition(Map<String, String> deviceToStorageGroupMap) {
-      schemaPartitionCacheLock.readLock().lock();
-      try {
-        String name = "SchemaPartition";
-        if (deviceToStorageGroupMap.size() == 0) {
-          CacheMetricsRecorder.record(false, name);
-          return null;
-        }
-        Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
-            new HashMap<>();
-        // check cache for each device
-        for (Map.Entry<String, String> entry : deviceToStorageGroupMap.entrySet()) {
-          String device = entry.getKey();
-          TSeriesPartitionSlot seriesPartitionSlot =
-              partitionExecutor.getSeriesPartitionSlot(device);
-          TRegionReplicaSet regionReplicaSet = schemaPartitionCache.getIfPresent(device);
-          if (null == regionReplicaSet) {
-            // if one device not find, then return cache miss.
-            logger.debug("Failed to find schema partition");
-            CacheMetricsRecorder.record(false, name);
-            return null;
-          }
-          String storageGroupName = deviceToStorageGroupMap.get(device);
-          if (!schemaPartitionMap.containsKey(storageGroupName)) {
-            schemaPartitionMap.put(storageGroupName, new HashMap<>());
-          }
-          Map<TSeriesPartitionSlot, TRegionReplicaSet> regionReplicaSetMap =
-              schemaPartitionMap.get(storageGroupName);
-          regionReplicaSetMap.put(seriesPartitionSlot, regionReplicaSet);
-        }
-        logger.debug("Hit schema partition");
-        // cache hit
-        CacheMetricsRecorder.record(true, name);
-        return new SchemaPartition(
-            schemaPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
-      } finally {
-        schemaPartitionCacheLock.readLock().unlock();
-      }
-    }
-
-    /** get dataPartition by query param map */
-    public DataPartition getDataPartition(
-        Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
-      dataPartitionCacheLock.readLock().lock();
-      try {
-        String name = "DataPartition";
-        if (sgNameToQueryParamsMap.size() == 0) {
-          CacheMetricsRecorder.record(false, name);
-          return null;
-        }
-        Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
-            dataPartitionMap = new HashMap<>();
-        // check cache for each storage group
-        for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
-            sgNameToQueryParamsMap.entrySet()) {
-          String storageGroupName = entry.getKey();
-          if (!dataPartitionMap.containsKey(storageGroupName)) {
-            dataPartitionMap.put(storageGroupName, new HashMap<>());
-          }
-          Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
-              seriesSlotToTimePartitionMap = dataPartitionMap.get(storageGroupName);
-          // check cache for each query param
-          for (DataPartitionQueryParam dataPartitionQueryParam : entry.getValue()) {
-            if (null == dataPartitionQueryParam.getTimePartitionSlotList()
-                || 0 == dataPartitionQueryParam.getTimePartitionSlotList().size()) {
-              // if query all data, cache miss
-              logger.debug("Failed to find data partition");
-              CacheMetricsRecorder.record(false, name);
-              return null;
-            }
-            TSeriesPartitionSlot seriesPartitionSlot =
-                partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath());
-            if (!seriesSlotToTimePartitionMap.containsKey(seriesPartitionSlot)) {
-              seriesSlotToTimePartitionMap.put(seriesPartitionSlot, new HashMap<>());
-            }
-            Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotListMap =
-                seriesSlotToTimePartitionMap.get(seriesPartitionSlot);
-            // check cache for each time partition
-            for (TTimePartitionSlot timePartitionSlot :
-                dataPartitionQueryParam.getTimePartitionSlotList()) {
-              DataPartitionCacheKey dataPartitionCacheKey =
-                  new DataPartitionCacheKey(seriesPartitionSlot, timePartitionSlot);
-              List<TRegionReplicaSet> regionReplicaSets =
-                  dataPartitionCache.getIfPresent(dataPartitionCacheKey);
-              if (null == regionReplicaSets) {
-                // if one time partition not find, cache miss
-                logger.debug("Failed to find data partition");
-                CacheMetricsRecorder.record(false, name);
-                return null;
-              }
-              timePartitionSlotListMap.put(timePartitionSlot, regionReplicaSets);
-            }
-          }
-        }
-        logger.debug("Hit data partition");
-        // cache hit
-        CacheMetricsRecorder.record(true, name);
-        return new DataPartition(dataPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
-      } finally {
-        dataPartitionCacheLock.readLock().unlock();
-      }
-    }
-
-    /** update schemaPartitionCache by schemaPartition. */
-    public void updateSchemaPartitionCache(List<String> devices, SchemaPartition schemaPartition) {
-      schemaPartitionCacheLock.writeLock().lock();
-      try {
-        Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> storageGroupPartitionMap =
-            schemaPartition.getSchemaPartitionMap();
-        Set<String> storageGroupNames = storageGroupPartitionMap.keySet();
-        for (String device : devices) {
-          if (!device.contains("*")) {
-            String storageGroup = null;
-            for (String storageGroupName : storageGroupNames) {
-              if (PathUtils.isStartWith(device, storageGroupName)) {
-                storageGroup = storageGroupName;
-                break;
-              }
-            }
-            if (null == storageGroup) {
-              // device not exist
-              continue;
-            }
-            TSeriesPartitionSlot seriesPartitionSlot =
-                partitionExecutor.getSeriesPartitionSlot(device);
-            TRegionReplicaSet regionReplicaSet =
-                storageGroupPartitionMap.get(storageGroup).getOrDefault(seriesPartitionSlot, null);
-            if (null == regionReplicaSet) {
-              logger.error(
-                  "Failed to get the regionReplicaSet of {} when update SchemaPartitionCache",
-                  device);
-              continue;
-            }
-            schemaPartitionCache.put(device, regionReplicaSet);
-          }
-        }
-      } finally {
-        schemaPartitionCacheLock.writeLock().unlock();
-      }
-    }
-
-    /** update dataPartitionCache by dataPartition */
-    public void updateDataPartitionCache(DataPartition dataPartition) {
-      dataPartitionCacheLock.writeLock().lock();
-      try {
-        for (Map.Entry<
-                String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
-            entry1 : dataPartition.getDataPartitionMap().entrySet()) {
-          for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
-              entry2 : entry1.getValue().entrySet()) {
-            TSeriesPartitionSlot seriesPartitionSlot = entry2.getKey();
-            for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> entry3 :
-                entry2.getValue().entrySet()) {
-              DataPartitionCacheKey dataPartitionCacheKey =
-                  new DataPartitionCacheKey(seriesPartitionSlot, entry3.getKey());
-              dataPartitionCache.put(dataPartitionCacheKey, entry3.getValue());
-            }
-          }
-        }
-      } finally {
-        dataPartitionCacheLock.writeLock().unlock();
-      }
-    }
-
-    /** invalid schemaPartitionCache by device */
-    public void invalidSchemaPartitionCache(String device) {
-      // TODO should be called in two situation: 1. redirect status 2. config node trigger
-      schemaPartitionCacheLock.writeLock().lock();
-      try {
-        schemaPartitionCache.invalidate(device);
-      } finally {
-        schemaPartitionCacheLock.writeLock().unlock();
-      }
-    }
-
-    /** invalid dataPartitionCache by seriesPartitionSlot, timePartitionSlot */
-    public void invalidDataPartitionCache(
-        TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) {
-      dataPartitionCacheLock.writeLock().lock();
-      // TODO should be called in two situation: 1. redirect status 2. config node trigger
-      try {
-        DataPartitionCacheKey dataPartitionCacheKey =
-            new DataPartitionCacheKey(seriesPartitionSlot, timePartitionSlot);
-        dataPartitionCache.invalidate(dataPartitionCacheKey);
-      } finally {
-        dataPartitionCacheLock.writeLock().unlock();
-      }
-    }
-
-    /** invalid schemaPartitionCache by device */
-    public void invalidAllSchemaPartitionCache() {
-      schemaPartitionCacheLock.writeLock().lock();
-      try {
-        schemaPartitionCache.invalidateAll();
-      } finally {
-        schemaPartitionCacheLock.writeLock().unlock();
-      }
-    }
-
-    /** invalid dataPartitionCache by seriesPartitionSlot, timePartitionSlot */
-    public void invalidAllDataPartitionCache() {
-      dataPartitionCacheLock.writeLock().lock();
-      try {
-        dataPartitionCache.invalidateAll();
-      } finally {
-        dataPartitionCacheLock.writeLock().unlock();
-      }
-    }
-
-    public boolean updateGroupIdToReplicaSetMap(TRegionRouteReq req) {
-      long timestamp = req.getTimestamp();
-      boolean result = (timestamp == latestUpdateTime.accumulateAndGet(timestamp, Math::max));
-      // if timestamp is greater than latestUpdateTime, then update
-      if (result) {
-        groupIdToReplicaSetMap.clear();
-        groupIdToReplicaSetMap.putAll(req.getRegionRouteMap());
-      }
-      return result;
-    }
-
-    @Override
-    public String toString() {
-      return "PartitionCache{"
-          + "cacheSize="
-          + cacheSize
-          + ", storageGroupCache="
-          + storageGroupCache
-          + ", schemaPartitionCache="
-          + schemaPartitionCache
-          + ", dataPartitionCache="
-          + dataPartitionCache
-          + '}';
-    }
-  }
-
-  private static class DataPartitionCacheKey {
-    private TSeriesPartitionSlot seriesPartitionSlot;
-    private TTimePartitionSlot timePartitionSlot;
-
-    public DataPartitionCacheKey(
-        TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) {
-      this.seriesPartitionSlot = seriesPartitionSlot;
-      this.timePartitionSlot = timePartitionSlot;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      DataPartitionCacheKey that = (DataPartitionCacheKey) o;
-      return Objects.equals(seriesPartitionSlot, that.seriesPartitionSlot)
-          && Objects.equals(timePartitionSlot, that.timePartitionSlot);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(seriesPartitionSlot, timePartitionSlot);
-    }
+    return new DataPartition(
+        regionReplicaSet,
+        config.getSeriesPartitionExecutorClass(),
+        config.getSeriesPartitionSlotNum());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
new file mode 100644
index 0000000000..bb9481570a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
@@ -0,0 +1,786 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.analyze.cache;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.service.metrics.recorder.CacheMetricsRecorder;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class PartitionCache {
+  private static final Logger logger = LoggerFactory.getLogger(PartitionCache.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final List<String> ROOT_PATH = Arrays.asList("root", "**");
+  private static final String STORAGE_GROUP_CACHE_NAME = "StorageGroup";
+  private static final String SCHEMA_PARTITION_CACHE_NAME = "SchemaPartition";
+  private static final String DATA_PARTITION_CACHE_NAME = "DataPartition";
+
+  /** calculate slotId by device */
+  private final String seriesSlotExecutorName = config.getSeriesPartitionExecutorClass();
+
+  private final int seriesPartitionSlotNum = config.getSeriesPartitionSlotNum();
+  private final SeriesPartitionExecutor partitionExecutor;
+
+  /** the size of partitionCache */
+  private final int cacheSize = config.getPartitionCacheSize();
+  /** the cache of storage group */
+  private final Set<String> storageGroupCache = Collections.synchronizedSet(new HashSet<>());
+  /** storage -> schemaPartitionTable */
+  private final Cache<String, SchemaPartitionTable> schemaPartitionCache;
+  /** storage -> dataPartitionTable */
+  private final Cache<String, DataPartitionTable> dataPartitionCache;
+
+  /** the latest time when groupIdToReplicaSetMap updated. */
+  private final AtomicLong latestUpdateTime = new AtomicLong(0);
+  /** TConsensusGroupId -> TRegionReplicaSet */
+  private final Map<TConsensusGroupId, TRegionReplicaSet> groupIdToReplicaSetMap =
+      new ConcurrentHashMap<>();
+
+  /** The lock of cache */
+  private final ReentrantReadWriteLock storageGroupCacheLock = new ReentrantReadWriteLock();
+
+  private final ReentrantReadWriteLock schemaPartitionCacheLock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock dataPartitionCacheLock = new ReentrantReadWriteLock();
+
+  private final IClientManager<PartitionRegionId, ConfigNodeClient> configNodeClientManager =
+      new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
+          .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+
+  public PartitionCache() {
+    this.schemaPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
+    this.dataPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
+    this.partitionExecutor =
+        SeriesPartitionExecutor.getSeriesPartitionExecutor(
+            this.seriesSlotExecutorName, this.seriesPartitionSlotNum);
+  }
+
+  // region storage group cache
+
+  /**
+   * get storage group to device map
+   *
+   * @param devicePaths the devices that need to hit
+   * @param isAutoCreate whether auto create storage group when cache miss
+   */
+  public Map<String, List<String>> getStorageGroupToDevice(
+      List<String> devicePaths, boolean isAutoCreate) {
+    StorageGroupCacheResult<List<String>> result =
+        new StorageGroupCacheResult<List<String>>() {
+          @Override
+          public void put(String device, String storageGroupName) {
+            map.computeIfAbsent(storageGroupName, k -> new ArrayList<>());
+            map.get(storageGroupName).add(device);
+          }
+        };
+    getStorageGroupCacheResult(result, devicePaths, isAutoCreate);
+    return result.getMap();
+  }
+
+  /**
+   * get device to storage group map
+   *
+   * @param devicePaths the devices that need to hit
+   * @param isAutoCreate whether auto create storage group when cache miss
+   */
+  public Map<String, String> getDeviceToStorageGroup(
+      List<String> devicePaths, boolean isAutoCreate) {
+    StorageGroupCacheResult<String> result =
+        new StorageGroupCacheResult<String>() {
+          @Override
+          public void put(String device, String storageGroupName) {
+            map.put(device, storageGroupName);
+          }
+        };
+    getStorageGroupCacheResult(result, devicePaths, isAutoCreate);
+    return result.getMap();
+  }
+
+  /**
+   * get storage group of device
+   *
+   * @param devicePath the path of device
+   * @return storage group name, return null if cache miss
+   */
+  private String getStorageGroupName(String devicePath) {
+    synchronized (storageGroupCache) {
+      for (String storageGroupName : storageGroupCache) {
+        if (PathUtils.isStartWith(devicePath, storageGroupName)) {
+          return storageGroupName;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * get all storage group from confignode and update storage group cache
+   *
+   * @param result the result of get storage group cache
+   * @param devicePaths the devices that need to hit
+   */
+  private void fetchStorageGroupAndUpdateCache(
+      StorageGroupCacheResult<?> result, List<String> devicePaths) throws IOException, TException {
+    try (ConfigNodeClient client =
+        configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      storageGroupCacheLock.writeLock().lock();
+      result.reset();
+      getStorageGroupMap(result, devicePaths, true);
+      if (!result.isSuccess()) {
+        TStorageGroupSchemaResp storageGroupSchemaResp =
+            client.getMatchedStorageGroupSchemas(ROOT_PATH);
+        if (storageGroupSchemaResp.getStatus().getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          Set<String> storageGroupNames =
+              storageGroupSchemaResp.getStorageGroupSchemaMap().keySet();
+          // update all storage group into cache
+          updateStorageCache(storageGroupNames);
+        }
+      }
+    } finally {
+      storageGroupCacheLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * create not existed storage group and update storage group cache
+   *
+   * @param result the result of get storage group cache
+   * @param devicePaths the devices that need to hit
+   * @throws RuntimeException if failed to create storage group
+   */
+  private void createStorageGroupAndUpdateCache(
+      StorageGroupCacheResult<?> result, List<String> devicePaths)
+      throws IOException, MetadataException, TException {
+    try (ConfigNodeClient client =
+        configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      storageGroupCacheLock.writeLock().lock();
+      // try to check whether storage group need to be created
+      result.reset();
+      // try to hit storage group with all missed devices
+      getStorageGroupMap(result, devicePaths, false);
+      if (!result.isSuccess()) {
+        // try to get storage group needed to be created from missed device
+        Set<String> storageGroupNamesNeedCreated = new HashSet<>();
+        for (String devicePath : result.getMissedDevices()) {
+          PartialPath storageGroupNameNeedCreated =
+              MetaUtils.getStorageGroupPathByLevel(
+                  new PartialPath(devicePath), config.getDefaultStorageGroupLevel());
+          storageGroupNamesNeedCreated.add(storageGroupNameNeedCreated.getFullPath());
+        }
+
+        // try to create storage groups one by one until done or one storage group fail
+        Set<String> successFullyCreatedStorageGroup = new HashSet<>();
+        for (String storageGroupName : storageGroupNamesNeedCreated) {
+          TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
+          storageGroupSchema.setName(storageGroupName);
+          TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
+          TSStatus tsStatus = client.setStorageGroup(req);
+          if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()) {
+            successFullyCreatedStorageGroup.add(storageGroupName);
+          } else {
+            // try to update cache by storage groups successfully created
+            updateStorageCache(successFullyCreatedStorageGroup);
+            logger.warn(
+                "[{} Cache] failed to create storage group {}",
+                STORAGE_GROUP_CACHE_NAME,
+                storageGroupName);
+            throw new RuntimeException(new IoTDBException(tsStatus.message, tsStatus.code));
+          }
+        }
+        // try to update storage group cache when all storage groups has already been created
+        updateStorageCache(storageGroupNamesNeedCreated);
+      }
+    } finally {
+      storageGroupCacheLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * get storage group map in one try
+   *
+   * @param result contains result(boolean), failed devices and the map
+   * @param devicePaths the devices that need to hit
+   * @param failFast if true, return when failed. if false, return when all devices hit
+   */
+  private void getStorageGroupMap(
+      StorageGroupCacheResult<?> result, List<String> devicePaths, boolean failFast) {
+    try {
+      storageGroupCacheLock.readLock().lock();
+      // reset result before try
+      result.reset();
+      boolean status = true;
+      for (String devicePath : devicePaths) {
+        String storageGroupName = getStorageGroupName(devicePath);
+        if (null == storageGroupName) {
+          logger.debug(
+              "[{} Cache] miss when search device {}", STORAGE_GROUP_CACHE_NAME, devicePath);
+          status = false;
+          if (failFast) {
+            break;
+          } else {
+            result.addMissedDevice(devicePath);
+          }
+        } else {
+          result.put(devicePath, storageGroupName);
+        }
+      }
+      // setFailed the result when miss
+      if (!status) {
+        result.setFailed();
+      }
+      logger.debug("[{} Cache] hit when search device {}", STORAGE_GROUP_CACHE_NAME, devicePaths);
+      CacheMetricsRecorder.record(status, STORAGE_GROUP_CACHE_NAME);
+    } finally {
+      storageGroupCacheLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * get storage group map in three try
+   *
+   * @param result contains result, failed devices and map
+   * @param devicePaths the devices that need to hit
+   * @param isAutoCreate whether auto create storage group when device miss
+   */
+  private void getStorageGroupCacheResult(
+      StorageGroupCacheResult<?> result, List<String> devicePaths, boolean isAutoCreate) {
+    Map<String, String> deviceToStorageGroupMap = new HashMap<>();
+    // miss when devicePath contains *
+    for (String devicePath : devicePaths) {
+      if (devicePath.contains("*")) {
+        return;
+      }
+    }
+    // first try to hit storage group in fast-fail way
+    getStorageGroupMap(result, devicePaths, true);
+    if (!result.isSuccess()) {
+      try {
+        // try to fetch storage group from config node when miss
+        fetchStorageGroupAndUpdateCache(result, devicePaths);
+        // second try to hit storage group in fast-fail way
+        getStorageGroupMap(result, devicePaths, true);
+        if (!result.isSuccess() && isAutoCreate) {
+          // try to auto create storage group of failed device
+          createStorageGroupAndUpdateCache(result, devicePaths);
+          // third try to hit storage group in fast-fail way
+          getStorageGroupMap(result, devicePaths, true);
+          if (!result.isSuccess()) {
+            throw new StatementAnalyzeException("Failed to get Storage Group Map in three try.");
+          }
+        }
+      } catch (TException | MetadataException | IOException e) {
+        throw new StatementAnalyzeException(
+            "An error occurred when executing getDeviceToStorageGroup():" + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * update storage group cache
+   *
+   * @param storageGroupNames the storage group names that need to update
+   */
+  public void updateStorageCache(Set<String> storageGroupNames) {
+    storageGroupCacheLock.writeLock().lock();
+    try {
+      storageGroupCache.addAll(storageGroupNames);
+    } finally {
+      storageGroupCacheLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * invalid storage group cache
+   *
+   * @param storageGroupNames the storage groups that need to invalid
+   */
+  public void removeFromStorageGroupCache(List<String> storageGroupNames) {
+    storageGroupCacheLock.writeLock().lock();
+    try {
+      for (String storageGroupName : storageGroupNames) {
+        storageGroupCache.remove(storageGroupName);
+      }
+    } finally {
+      storageGroupCacheLock.writeLock().unlock();
+    }
+  }
+
+  /** invalid all storage group cache */
+  public void removeFromStorageGroupCache() {
+    storageGroupCacheLock.writeLock().lock();
+    try {
+      storageGroupCache.clear();
+    } finally {
+      storageGroupCacheLock.writeLock().unlock();
+    }
+  }
+
+  // endregion
+
+  // region replicaSet cache
+  /**
+   * get regionReplicaSet from local and confignode
+   *
+   * @param consensusGroupId the id of consensus group
+   * @return regionReplicaSet
+   * @throws RuntimeException if failed to get regionReplicaSet from confignode
+   * @throws StatementAnalyzeException if there are exception when try to get latestRegionRouteMap
+   */
+  public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId consensusGroupId) {
+    if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
+      // try to update latestRegionRegionRouteMap when miss
+      synchronized (groupIdToReplicaSetMap) {
+        try (ConfigNodeClient client =
+            configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+          TRegionRouteMapResp resp = client.getLatestRegionRouteMap();
+          if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode()) {
+            updateGroupIdToReplicaSetMap(resp.getTimestamp(), resp.getRegionRouteMap());
+          }
+          if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
+            // failed to get RegionReplicaSet from confignode
+            throw new RuntimeException(
+                "Failed to get replicaSet of consensus group[id= " + consensusGroupId + "]");
+          }
+        } catch (IOException | TException e) {
+          throw new StatementAnalyzeException(
+              "An error occurred when executing getRegionReplicaSet():" + e.getMessage());
+        }
+      }
+    }
+    // try to get regionReplicaSet by consensusGroupId
+    return groupIdToReplicaSetMap.get(consensusGroupId);
+  }
+
+  /**
+   * update regionReplicaSetMap according to timestamp
+   *
+   * @param timestamp the timestamp of map that need to update
+   * @param map consensusGroupId to regionReplicaSet map
+   * @return true if update successfully or false when map is not latest
+   */
+  public boolean updateGroupIdToReplicaSetMap(
+      long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> map) {
+    boolean result = (timestamp == latestUpdateTime.accumulateAndGet(timestamp, Math::max));
+    // if timestamp is greater than latestUpdateTime, then update
+    if (result) {
+      groupIdToReplicaSetMap.clear();
+      groupIdToReplicaSetMap.putAll(map);
+    }
+    return result;
+  }
+
+  /** invalid replicaSetCache */
+  public void invalidReplicaSetCache() {
+    groupIdToReplicaSetMap.clear();
+  }
+
+  // endregion
+
+  // region schema partition cache
+
+  /**
+   * get schemaPartition
+   *
+   * @param storageGroupToDeviceMap storage group to devices map
+   * @return SchemaPartition of storageGroupToDeviceMap
+   */
+  public SchemaPartition getSchemaPartition(Map<String, List<String>> storageGroupToDeviceMap) {
+    schemaPartitionCacheLock.readLock().lock();
+    try {
+      if (storageGroupToDeviceMap.size() == 0) {
+        CacheMetricsRecorder.record(false, SCHEMA_PARTITION_CACHE_NAME);
+        return null;
+      }
+      Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
+          new HashMap<>();
+
+      // check cache for each storage group
+      for (Map.Entry<String, List<String>> entry : storageGroupToDeviceMap.entrySet()) {
+        String storageGroupName = entry.getKey();
+        Map<TSeriesPartitionSlot, TRegionReplicaSet> regionReplicaSetMap =
+            schemaPartitionMap.computeIfAbsent(storageGroupName, k -> new HashMap<>());
+        SchemaPartitionTable schemaPartitionTable =
+            schemaPartitionCache.getIfPresent(storageGroupName);
+        if (null == schemaPartitionTable) {
+          // if storage group not find, then return cache miss.
+          logger.debug(
+              "[{} Cache] miss when search storage group {}",
+              SCHEMA_PARTITION_CACHE_NAME,
+              storageGroupName);
+          CacheMetricsRecorder.record(false, SCHEMA_PARTITION_CACHE_NAME);
+          return null;
+        }
+        Map<TSeriesPartitionSlot, TConsensusGroupId> map =
+            schemaPartitionTable.getSchemaPartitionMap();
+        // check cache for each device
+        for (String device : entry.getValue()) {
+          TSeriesPartitionSlot seriesPartitionSlot =
+              partitionExecutor.getSeriesPartitionSlot(device);
+          if (!map.containsKey(seriesPartitionSlot)) {
+            // if one device not find, then return cache miss.
+            logger.debug(
+                "[{} Cache] miss when search device {}", SCHEMA_PARTITION_CACHE_NAME, device);
+            CacheMetricsRecorder.record(false, SCHEMA_PARTITION_CACHE_NAME);
+            return null;
+          }
+          TConsensusGroupId consensusGroupId = map.get(seriesPartitionSlot);
+          TRegionReplicaSet regionReplicaSet = getRegionReplicaSet(consensusGroupId);
+          regionReplicaSetMap.put(seriesPartitionSlot, regionReplicaSet);
+        }
+      }
+      logger.debug("[{} Cache] hit", SCHEMA_PARTITION_CACHE_NAME);
+      // cache hit
+      CacheMetricsRecorder.record(true, SCHEMA_PARTITION_CACHE_NAME);
+      return new SchemaPartition(
+          schemaPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
+    } finally {
+      schemaPartitionCacheLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * update schemaPartitionCache by schemaPartition.
+   *
+   * @param schemaPartitionTable storage group to SeriesPartitionSlot to ConsensusGroupId map
+   */
+  public void updateSchemaPartitionCache(
+      Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable) {
+    schemaPartitionCacheLock.writeLock().lock();
+    try {
+      for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> entry1 :
+          schemaPartitionTable.entrySet()) {
+        String storageGroupName = entry1.getKey();
+        SchemaPartitionTable result = schemaPartitionCache.getIfPresent(storageGroupName);
+        if (null == result) {
+          result = new SchemaPartitionTable();
+          schemaPartitionCache.put(storageGroupName, result);
+        }
+        Map<TSeriesPartitionSlot, TConsensusGroupId> seriesPartitionSlotTConsensusGroupIdMap =
+            result.getSchemaPartitionMap();
+        seriesPartitionSlotTConsensusGroupIdMap.putAll(entry1.getValue());
+      }
+    } finally {
+      schemaPartitionCacheLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * invalid schemaPartitionCache by storage group
+   *
+   * @param storageGroupName the storage groups that need to invalid
+   */
+  public void invalidSchemaPartitionCache(String storageGroupName) {
+    schemaPartitionCacheLock.writeLock().lock();
+    try {
+      schemaPartitionCache.invalidate(storageGroupName);
+    } finally {
+      schemaPartitionCacheLock.writeLock().unlock();
+    }
+  }
+
+  /** invalid all schemaPartitionCache */
+  public void invalidAllSchemaPartitionCache() {
+    schemaPartitionCacheLock.writeLock().lock();
+    try {
+      schemaPartitionCache.invalidateAll();
+    } finally {
+      schemaPartitionCacheLock.writeLock().unlock();
+    }
+  }
+  // endregion
+
+  // region data partition cache
+
+  /**
+   * get dataPartition by query param map
+   *
+   * @param storageGroupToQueryParamsMap storage group to dataPartitionQueryParam map
+   * @return DataPartition of storageGroupToQueryParamsMap
+   */
+  public DataPartition getDataPartition(
+      Map<String, List<DataPartitionQueryParam>> storageGroupToQueryParamsMap) {
+    dataPartitionCacheLock.readLock().lock();
+    try {
+      if (storageGroupToQueryParamsMap.size() == 0) {
+        CacheMetricsRecorder.record(false, DATA_PARTITION_CACHE_NAME);
+        return null;
+      }
+      Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+          dataPartitionMap = new HashMap<>();
+      // check cache for each storage group
+      for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+          storageGroupToQueryParamsMap.entrySet()) {
+        if (!getStorageGroupDataPartition(dataPartitionMap, entry.getKey(), entry.getValue())) {
+          CacheMetricsRecorder.record(false, DATA_PARTITION_CACHE_NAME);
+          return null;
+        }
+      }
+      logger.debug("[{} Cache] hit", DATA_PARTITION_CACHE_NAME);
+      // cache hit
+      CacheMetricsRecorder.record(true, DATA_PARTITION_CACHE_NAME);
+      return new DataPartition(dataPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
+    } finally {
+      dataPartitionCacheLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * get dataPartition from storage group
+   *
+   * @param dataPartitionMap result
+   * @param storageGroupName storage group that need to get
+   * @param dataPartitionQueryParams specific query params of data partition
+   * @return whether hit
+   */
+  private boolean getStorageGroupDataPartition(
+      Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+          dataPartitionMap,
+      String storageGroupName,
+      List<DataPartitionQueryParam> dataPartitionQueryParams) {
+    DataPartitionTable dataPartitionTable = dataPartitionCache.getIfPresent(storageGroupName);
+    if (null == dataPartitionTable) {
+      logger.debug(
+          "[{} Cache] miss when search storage group {}",
+          DATA_PARTITION_CACHE_NAME,
+          storageGroupName);
+      return false;
+    }
+    Map<TSeriesPartitionSlot, SeriesPartitionTable> cachedStorageGroupPartitionMap =
+        dataPartitionTable.getDataPartitionMap();
+    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+        seriesSlotToTimePartitionMap =
+            dataPartitionMap.computeIfAbsent(storageGroupName, k -> new HashMap<>());
+    // check cache for each device
+    for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+      if (!getDeviceDataPartition(
+          seriesSlotToTimePartitionMap, dataPartitionQueryParam, cachedStorageGroupPartitionMap)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * get dataPartition from device
+   *
+   * @param seriesSlotToTimePartitionMap result
+   * @param dataPartitionQueryParam specific query param of data partition
+   * @param cachedStorageGroupPartitionMap all cached data partition map of related storage group
+   * @return whether hit
+   */
+  private boolean getDeviceDataPartition(
+      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+          seriesSlotToTimePartitionMap,
+      DataPartitionQueryParam dataPartitionQueryParam,
+      Map<TSeriesPartitionSlot, SeriesPartitionTable> cachedStorageGroupPartitionMap) {
+    TSeriesPartitionSlot seriesPartitionSlot =
+        partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath());
+    SeriesPartitionTable cachedSeriesPartitionTable =
+        cachedStorageGroupPartitionMap.get(seriesPartitionSlot);
+    if (null == cachedSeriesPartitionTable) {
+      logger.debug(
+          "[{} Cache] miss when search device {}",
+          DATA_PARTITION_CACHE_NAME,
+          dataPartitionQueryParam.getDevicePath());
+      return false;
+    }
+    Map<TTimePartitionSlot, List<TConsensusGroupId>> cachedTimePartitionSlot =
+        cachedSeriesPartitionTable.getSeriesPartitionMap();
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotListMap =
+        seriesSlotToTimePartitionMap.computeIfAbsent(seriesPartitionSlot, k -> new HashMap<>());
+    // check cache for each time partition
+    for (TTimePartitionSlot timePartitionSlot :
+        dataPartitionQueryParam.getTimePartitionSlotList()) {
+      if (!getTimeSlotDataPartition(
+          timePartitionSlotListMap, timePartitionSlot, cachedTimePartitionSlot)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * get dataPartition from time slot
+   *
+   * @param timePartitionSlotListMap result
+   * @param timePartitionSlot the specific time partition slot of data partition
+   * @param cachedTimePartitionSlot all cached time slot map of related device
+   * @return whether hit
+   */
+  private boolean getTimeSlotDataPartition(
+      Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotListMap,
+      TTimePartitionSlot timePartitionSlot,
+      Map<TTimePartitionSlot, List<TConsensusGroupId>> cachedTimePartitionSlot) {
+    List<TConsensusGroupId> cacheConsensusGroupId = cachedTimePartitionSlot.get(timePartitionSlot);
+    if (null == cacheConsensusGroupId || 0 == cacheConsensusGroupId.size()) {
+      logger.debug(
+          "[{} Cache] miss when search time partition {}",
+          DATA_PARTITION_CACHE_NAME,
+          timePartitionSlot);
+      return false;
+    }
+    List<TRegionReplicaSet> regionReplicaSets = new LinkedList<>();
+    for (TConsensusGroupId consensusGroupId : cacheConsensusGroupId) {
+      regionReplicaSets.add(getRegionReplicaSet(consensusGroupId));
+    }
+    timePartitionSlotListMap.put(timePartitionSlot, regionReplicaSets);
+    return true;
+  }
+
+  /**
+   * update dataPartitionCache by dataPartition
+   *
+   * @param dataPartitionTable storage group to seriesPartitionSlot to timePartitionSlot to
+   *     ConsensusGroupId map
+   */
+  public void updateDataPartitionCache(
+      Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
+          dataPartitionTable) {
+    dataPartitionCacheLock.writeLock().lock();
+    try {
+      for (Map.Entry<
+              String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
+          entry1 : dataPartitionTable.entrySet()) {
+        String storageGroupName = entry1.getKey();
+        DataPartitionTable result = dataPartitionCache.getIfPresent(storageGroupName);
+        if (null == result) {
+          result = new DataPartitionTable();
+          dataPartitionCache.put(storageGroupName, result);
+        }
+        Map<TSeriesPartitionSlot, SeriesPartitionTable> result2 = result.getDataPartitionMap();
+        for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
+            entry2 : entry1.getValue().entrySet()) {
+          TSeriesPartitionSlot seriesPartitionSlot = entry2.getKey();
+          SeriesPartitionTable seriesPartitionTable;
+          if (!result2.containsKey(seriesPartitionSlot)) {
+            // if device not exists, then add new seriesPartitionTable
+            seriesPartitionTable = new SeriesPartitionTable(entry2.getValue());
+            result2.put(seriesPartitionSlot, seriesPartitionTable);
+          } else {
+            // if device exists, then merge
+            seriesPartitionTable = result2.get(seriesPartitionSlot);
+            Map<TTimePartitionSlot, List<TConsensusGroupId>> result3 =
+                seriesPartitionTable.getSeriesPartitionMap();
+            result3.putAll(entry2.getValue());
+          }
+        }
+      }
+    } finally {
+      dataPartitionCacheLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * invalid dataPartitionCache by storageGroup
+   *
+   * @param storageGroup the storage groups that need to invalid
+   */
+  public void invalidDataPartitionCache(String storageGroup) {
+    dataPartitionCacheLock.writeLock().lock();
+    try {
+      dataPartitionCache.invalidate(storageGroup);
+    } finally {
+      dataPartitionCacheLock.writeLock().unlock();
+    }
+  }
+
+  /** invalid all dataPartitionCache */
+  public void invalidAllDataPartitionCache() {
+    dataPartitionCacheLock.writeLock().lock();
+    try {
+      dataPartitionCache.invalidateAll();
+    } finally {
+      dataPartitionCacheLock.writeLock().unlock();
+    }
+  }
+
+  // endregion
+
+  public void invalidAllCache() {
+    logger.debug("[Partition Cache] invalid");
+    removeFromStorageGroupCache();
+    invalidAllDataPartitionCache();
+    invalidAllSchemaPartitionCache();
+    invalidReplicaSetCache();
+    logger.debug("[Partition Cache] is invalid:{}", this);
+  }
+
+  @Override
+  public String toString() {
+    return "PartitionCache{"
+        + "cacheSize="
+        + cacheSize
+        + ", storageGroupCache="
+        + storageGroupCache
+        + ", replicaSetCache="
+        + groupIdToReplicaSetMap
+        + ", schemaPartitionCache="
+        + schemaPartitionCache
+        + ", dataPartitionCache="
+        + dataPartitionCache
+        + '}';
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/StorageGroupCacheResult.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/StorageGroupCacheResult.java
new file mode 100644
index 0000000000..29c811d1e0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/StorageGroupCacheResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.analyze.cache;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class StorageGroupCacheResult<V> {
+  /** the result */
+  private boolean success = true;
+  /** the list of devices that miss */
+  private List<String> missedDevices = new ArrayList<>();
+  /** result map, Notice: this map will be empty when failed */
+  protected Map<String, V> map = new HashMap<>();;
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  public void setSuccess(boolean success) {
+    this.success = success;
+  }
+
+  public List<String> getMissedDevices() {
+    return missedDevices;
+  }
+
+  public void addMissedDevice(String missedDevice) {
+    this.missedDevices.add(missedDevice);
+  }
+
+  public Map<String, V> getMap() {
+    return map;
+  }
+
+  public abstract void put(String device, String storageGroupName);
+
+  /** set failed and clear the map */
+  public void setFailed() {
+    this.success = false;
+    this.map = new HashMap<>();
+  }
+
+  /** reset storageGroupCacheResult */
+  public void reset() {
+    this.success = true;
+    this.missedDevices = new ArrayList<>();
+    this.map = new HashMap<>();
+  }
+}
diff --git a/server/src/test/resources/datanode1conf/iotdb-datanode.properties b/server/src/test/resources/datanode1conf/iotdb-datanode.properties
index cd17728791..837faf2187 100644
--- a/server/src/test/resources/datanode1conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode1conf/iotdb-datanode.properties
@@ -26,7 +26,7 @@ internal_port=9003
 data_region_consensus_port=40010
 schema_region_consensus_port=50030
 
-target_config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
+target_config_nodes=127.0.0.1:22277,127.0.0.1:22279,127.0.0.1:22281
 
 system_dir=target/datanode1/system
 data_dirs=target/datanode1/data
diff --git a/server/src/test/resources/datanode2conf/iotdb-datanode.properties b/server/src/test/resources/datanode2conf/iotdb-datanode.properties
index 097e063007..cc5e6394ae 100644
--- a/server/src/test/resources/datanode2conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode2conf/iotdb-datanode.properties
@@ -26,7 +26,7 @@ internal_port=9005
 data_region_consensus_port=40012
 schema_region_consensus_port=50032
 
-target_config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
+target_config_nodes=127.0.0.1:22277,127.0.0.1:22279,127.0.0.1:22281
 
 system_dir=target/datanode2/system
 data_dirs=target/datanode2/data
diff --git a/server/src/test/resources/datanode3conf/iotdb-datanode.properties b/server/src/test/resources/datanode3conf/iotdb-datanode.properties
index 86be90b040..af5c60f450 100644
--- a/server/src/test/resources/datanode3conf/iotdb-datanode.properties
+++ b/server/src/test/resources/datanode3conf/iotdb-datanode.properties
@@ -26,7 +26,7 @@ internal_port=9007
 data_region_consensus_port=40014
 schema_region_consensus_port=50034
 
-target_config_nodes=0.0.0.0:22277,0.0.0.0:22279,0.0.0.0:22281
+target_config_nodes=127.0.0.1:22277,127.0.0.1:22279,127.0.0.1:22281
 
 system_dir=target/datanode3/system
 data_dirs=target/datanode3/data