You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/27 05:30:13 UTC

[iotdb] branch master updated: [IOTDB-2960]Add partition cache (#5685)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bf3dbb674 [IOTDB-2960]Add partition cache (#5685)
4bf3dbb674 is described below

commit 4bf3dbb6743ef68f921de03283890fd98d0c09a0
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Wed Apr 27 13:30:08 2022 +0800

    [IOTDB-2960]Add partition cache (#5685)
---
 .../resources/conf/iotdb-engine.properties         |   5 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  14 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../mpp/sql/analyze/ClusterPartitionFetcher.java   | 446 +++++++++++++++++++--
 .../mpp/sql/analyze/FakePartitionFetcherImpl.java  |  11 +
 .../db/mpp/sql/analyze/IPartitionFetcher.java      |   4 +
 .../sql/analyze/StandalonePartitionFetcher.java    |  11 +
 7 files changed, 455 insertions(+), 41 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index ac905a0f53..27e06fe926 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -1012,6 +1012,11 @@ timestamp_precision=ms
 # Datatype: int
 # datanode_schema_cache_size=10000
 
+# cache size for partition.
+# This cache is used to improve partition fetch from config node.
+# Datatype: int
+# partition_cache_size=10000
+
 ####################
 ### Schema File Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 527dd4e42b..edb9790918 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -896,6 +896,12 @@ public class IoTDBConfig {
    */
   private int dataNodeSchemaCacheSize = 10000;
 
+  /**
+   * Cache size of partition cache in {@link
+   * org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher}
+   */
+  private int partitionCacheSize = 10000;
+
   public float getUdfMemoryBudgetInMB() {
     return udfMemoryBudgetInMB;
   }
@@ -2829,4 +2835,12 @@ public class IoTDBConfig {
   public void setDataNodeSchemaCacheSize(int dataNodeSchemaCacheSize) {
     this.dataNodeSchemaCacheSize = dataNodeSchemaCacheSize;
   }
+
+  public int getPartitionCacheSize() {
+    return partitionCacheSize;
+  }
+
+  public void setPartitionCacheSize(int partitionCacheSize) {
+    this.partitionCacheSize = partitionCacheSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index bbfd20b6c0..556c03b582 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1598,6 +1598,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "data_block_manager_keep_alive_time_in_ms",
                 Integer.toString(conf.getDataBlockManagerKeepAliveTimeInMs()))));
+
+    conf.setPartitionCacheSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "partition_cache_size", Integer.toString(conf.getPartitionCacheSize()))));
   }
 
   /** Get default encode algorithm by data type */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
index b2ecaa612c..49c121742c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
@@ -30,30 +30,48 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class ClusterPartitionFetcher implements IPartitionFetcher {
-
+  private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private final ConfigNodeClient client;
 
   private SeriesPartitionExecutor partitionExecutor;
 
+  private PartitionCache partitionCache;
+
   private static final class ClusterPartitionFetcherHolder {
     private static final ClusterPartitionFetcher INSTANCE = new ClusterPartitionFetcher();
 
@@ -70,59 +88,57 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     } catch (IoTDBConnectionException | BadNodeUrlException e) {
       throw new StatementAnalyzeException("Couldn't connect config node");
     }
-    initPartitionExecutor();
-  }
-
-  private void initPartitionExecutor() {
-    // TODO: @crz move to node-commons
-    IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
-    try {
-      Class<?> executor = Class.forName(conf.getSeriesPartitionExecutorClass());
-      Constructor<?> executorConstructor = executor.getConstructor(int.class);
-      this.partitionExecutor =
-          (SeriesPartitionExecutor)
-              executorConstructor.newInstance(conf.getSeriesPartitionSlotNum());
-    } catch (ClassNotFoundException
-        | NoSuchMethodException
-        | InstantiationException
-        | IllegalAccessException
-        | InvocationTargetException e) {
-      throw new StatementAnalyzeException(
-          String.format(
-              "Couldn't Constructor SeriesPartitionExecutor class: %s",
-              conf.getSeriesPartitionExecutorClass()));
-    }
+    this.partitionExecutor =
+        SeriesPartitionExecutor.getSeriesPartitionExecutor(
+            config.getSeriesPartitionExecutorClass(), config.getSeriesPartitionSlotNum());
+    this.partitionCache =
+        new PartitionCache(
+            config.getSeriesPartitionExecutorClass(), config.getSeriesPartitionSlotNum());
   }
 
   @Override
   public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
     try {
-      TSchemaPartitionResp schemaPartitionResp =
-          client.getSchemaPartition(constructSchemaPartitionReq(patternTree));
-      if (schemaPartitionResp.getStatus().getCode()
-          == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        return parseSchemaPartitionResp(schemaPartitionResp);
+      patternTree.constructTree();
+      List<String> devicePaths = patternTree.findAllDevicePaths();
+      Map<String, String> deviceToStorageGroupMap = getDeviceToStorageGroup(devicePaths);
+      SchemaPartition schemaPartition = partitionCache.getSchemaPartition(deviceToStorageGroupMap);
+      if (null == schemaPartition) {
+        TSchemaPartitionResp schemaPartitionResp =
+            client.getSchemaPartition(constructSchemaPartitionReq(patternTree));
+        if (schemaPartitionResp.getStatus().getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          schemaPartition = parseSchemaPartitionResp(schemaPartitionResp);
+          partitionCache.updateSchemaPartitionCache(devicePaths, schemaPartition);
+        }
       }
+      return schemaPartition;
     } catch (IoTDBConnectionException e) {
       throw new StatementAnalyzeException("An error occurred when executing getSchemaPartition()");
     }
-    return null;
   }
 
   @Override
   public SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree) {
     try {
-      TSchemaPartitionResp schemaPartitionResp =
-          client.getOrCreateSchemaPartition(constructSchemaPartitionReq(patternTree));
-      if (schemaPartitionResp.getStatus().getCode()
-          == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        return parseSchemaPartitionResp(schemaPartitionResp);
+      patternTree.constructTree();
+      List<String> devicePaths = patternTree.findAllDevicePaths();
+      Map<String, String> deviceToStorageGroupMap = getDeviceToStorageGroup(devicePaths);
+      SchemaPartition schemaPartition = partitionCache.getSchemaPartition(deviceToStorageGroupMap);
+      if (null == schemaPartition) {
+        TSchemaPartitionResp schemaPartitionResp =
+            client.getOrCreateSchemaPartition(constructSchemaPartitionReq(patternTree));
+        if (schemaPartitionResp.getStatus().getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          schemaPartition = parseSchemaPartitionResp(schemaPartitionResp);
+          partitionCache.updateSchemaPartitionCache(devicePaths, schemaPartition);
+        }
       }
+      return schemaPartition;
     } catch (IoTDBConnectionException e) {
       throw new StatementAnalyzeException(
           "An error occurred when executing getOrCreateSchemaPartition()");
     }
-    return null;
   }
 
   @Override
@@ -140,6 +156,27 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     return null;
   }
 
+  @Override
+  public DataPartition getDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams) {
+    try {
+      Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
+          splitDataPartitionQueryParam(dataPartitionQueryParams);
+      DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
+      if (null == dataPartition) {
+        TDataPartitionResp dataPartitionResp =
+            client.getDataPartition(constructDataPartitionReq(splitDataPartitionQueryParams));
+        if (dataPartitionResp.getStatus().getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          dataPartition = parseDataPartitionResp(dataPartitionResp);
+          partitionCache.updateDataPartitionCache(dataPartition);
+        }
+      }
+      return dataPartition;
+    } catch (IoTDBConnectionException e) {
+      throw new StatementAnalyzeException("An error occurred when executing getDataPartition()");
+    }
+  }
+
   @Override
   public DataPartition getOrCreateDataPartition(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
@@ -156,6 +193,101 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     return null;
   }
 
+  @Override
+  public DataPartition getOrCreateDataPartition(
+      List<DataPartitionQueryParam> dataPartitionQueryParams) {
+    try {
+      Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
+          splitDataPartitionQueryParam(dataPartitionQueryParams);
+      DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
+      if (null == dataPartition) {
+        TDataPartitionResp dataPartitionResp =
+            client.getOrCreateDataPartition(
+                constructDataPartitionReq(splitDataPartitionQueryParams));
+        if (dataPartitionResp.getStatus().getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          dataPartition = parseDataPartitionResp(dataPartitionResp);
+          partitionCache.updateDataPartitionCache(dataPartition);
+        }
+      }
+      return dataPartition;
+    } catch (IoTDBConnectionException e) {
+      throw new StatementAnalyzeException(
+          "An error occurred when executing getOrCreateDataPartition()");
+    }
+  }
+
+  /** get deviceToStorageGroup map */
+  private Map<String, String> getDeviceToStorageGroup(List<String> devicePaths) {
+    Map<String, String> deviceToStorageGroup = new HashMap<>();
+    // first try to hit cache
+    if (!partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup)) {
+      List<String> storageGroupPathPattern = Arrays.asList("root", "**");
+      try {
+        TStorageGroupSchemaResp storageGroupSchemaResp =
+            client.getMatchedStorageGroupSchemas(storageGroupPathPattern);
+        if (storageGroupSchemaResp.getStatus().getCode()
+            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          Set<String> storageGroupNames =
+              storageGroupSchemaResp.getStorageGroupSchemaMap().keySet();
+          // update all storage group into cache
+          partitionCache.updateStorageCache(storageGroupNames);
+          // second try to hit cache
+          deviceToStorageGroup = new HashMap<>();
+          if (!partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup)) {
+            // try to auto create storage group
+            Set<String> storageGroupNamesNeedCreated = new HashSet<>();
+            for (String devicePath : devicePaths) {
+              if (!deviceToStorageGroup.containsKey(devicePath)) {
+                PartialPath storageGroupNameNeedCreated =
+                    MetaUtils.getStorageGroupPathByLevel(
+                        new PartialPath(devicePath), config.getDefaultStorageGroupLevel());
+                storageGroupNamesNeedCreated.add(storageGroupNameNeedCreated.getFullPath());
+              }
+            }
+            for (String storageGroupName : storageGroupNamesNeedCreated) {
+              TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
+              storageGroupSchema.setName(storageGroupName);
+              TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
+              client.setStorageGroup(req);
+            }
+            partitionCache.updateStorageCache(storageGroupNamesNeedCreated);
+            // third try to hit cache
+            deviceToStorageGroup = new HashMap<>();
+            if (!partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup)) {
+              throw new StatementAnalyzeException(
+                  "Failed to get Storage Group Map when executing getOrCreateDataPartition()");
+            }
+          }
+        }
+      } catch (IoTDBConnectionException | MetadataException e) {
+        throw new StatementAnalyzeException(
+            "An error occurred when executing getOrCreateDataPartition()");
+      }
+    }
+    return deviceToStorageGroup;
+  }
+
+  /** split data partition query param by storage group */
+  private Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParam(
+      List<DataPartitionQueryParam> dataPartitionQueryParams) {
+    List<String> devicePaths = new ArrayList<>();
+    for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+      devicePaths.add(dataPartitionQueryParam.getDevicePath());
+    }
+    Map<String, String> deviceToStorageGroup = getDeviceToStorageGroup(devicePaths);
+
+    Map<String, List<DataPartitionQueryParam>> result = new HashMap<>();
+    for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+      String storageGroup = deviceToStorageGroup.get(dataPartitionQueryParam.getDevicePath());
+      if (!result.containsKey(storageGroup)) {
+        result.put(storageGroup, new ArrayList<>());
+      }
+      result.get(storageGroup).add(dataPartitionQueryParam);
+    }
+    return result;
+  }
+
   private TSchemaPartitionReq constructSchemaPartitionReq(PathPatternTree patternTree) {
     PublicBAOS baos = new PublicBAOS();
     try {
@@ -208,8 +340,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     }
     return new SchemaPartition(
         schemaPartitionMap,
-        IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-        IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+        config.getSeriesPartitionExecutorClass(),
+        config.getSeriesPartitionSlotNum());
   }
 
   private DataPartition parseDataPartitionResp(TDataPartitionResp dataPartitionResp) {
@@ -245,7 +377,239 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     }
     return new DataPartition(
         dataPartitionMap,
-        IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-        IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+        config.getSeriesPartitionExecutorClass(),
+        config.getSeriesPartitionSlotNum());
+  }
+
+  private class PartitionCache {
+    /** the size of partitionCache */
+    private final int cacheSize = config.getPartitionCacheSize();
+    /** the cache of storage group */
+    private Set<String> storageGroupCache = Collections.synchronizedSet(new HashSet<>());
+    /** device -> tRegionReplicaSet */
+    private final Cache<String, TRegionReplicaSet> schemaPartitionCache;
+    /** tSeriesPartitionSlot, tTimesereisPartitionSlot -> TRegionReplicaSets * */
+    private final Cache<DataPartitionCacheKey, List<TRegionReplicaSet>> dataPartitionCache;
+    /** calculate slotId by device */
+    private final String seriesSlotExecutorName;
+
+    private final int seriesPartitionSlotNum;
+
+    public PartitionCache(String seriesSlotExecutorName, int seriesPartitionSlotNum) {
+      this.seriesSlotExecutorName = seriesSlotExecutorName;
+      this.seriesPartitionSlotNum = seriesPartitionSlotNum;
+      this.schemaPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
+      this.dataPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
+    }
+
+    /** get storage group by cache */
+    public boolean getStorageGroup(
+        List<String> devicePaths, Map<String, String> deviceToStorageGroupMap) {
+      if (storageGroupCache.size() == 0) {
+        logger.debug("Failed to get storage group");
+        return false;
+      }
+      for (String devicePath : devicePaths) {
+        for (String storageGroup : storageGroupCache) {
+          if (devicePath.startsWith(storageGroup)) {
+            deviceToStorageGroupMap.put(devicePath, storageGroup);
+          } else {
+            logger.debug("Failed to get storage group cache");
+            return false;
+          }
+        }
+      }
+      logger.debug("Hit storage group");
+      return true;
+    }
+
+    /** update the cache of storage group */
+    public void updateStorageCache(Set<String> storageGroupNames) {
+      for (String storageGroupName : storageGroupNames) {
+        if (!storageGroupCache.contains(storageGroupName)) {
+          storageGroupCache.add(storageGroupName);
+        }
+      }
+    }
+
+    /** invalid storage group after delete */
+    public void invalidStorageGroupCache(List<String> storageGroupNames) {
+      for (String storageGroupName : storageGroupNames) {
+        if (storageGroupCache.contains(storageGroupName)) {
+          storageGroupCache.remove(storageGroupName);
+        }
+      }
+    }
+
+    /** get schemaPartition by patternTree */
+    public SchemaPartition getSchemaPartition(Map<String, String> deviceToStorageGroupMap) {
+      Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
+          new HashMap<>();
+      // check cache for each device
+      for (Map.Entry<String, String> entry : deviceToStorageGroupMap.entrySet()) {
+        String device = entry.getKey();
+        TSeriesPartitionSlot seriesPartitionSlot = partitionExecutor.getSeriesPartitionSlot(device);
+        TRegionReplicaSet regionReplicaSet = schemaPartitionCache.getIfPresent(device);
+        if (null == regionReplicaSet) {
+          // if one device not find, then return cache miss.
+          logger.debug("Failed to find schema partition");
+          return null;
+        }
+        String storageGroupName = deviceToStorageGroupMap.get(device);
+        if (!schemaPartitionMap.containsKey(storageGroupName)) {
+          schemaPartitionMap.put(storageGroupName, new HashMap<>());
+        }
+        Map<TSeriesPartitionSlot, TRegionReplicaSet> regionReplicaSetMap =
+            schemaPartitionMap.get(storageGroupName);
+        regionReplicaSetMap.put(seriesPartitionSlot, regionReplicaSet);
+      }
+      logger.debug("Hit schema partition");
+      // cache hit
+      return new SchemaPartition(
+          schemaPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
+    }
+
+    /** get dataPartition by query param map */
+    public DataPartition getDataPartition(
+        Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+      Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+          dataPartitionMap = new HashMap<>();
+      // check cache for each storage group
+      for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+          sgNameToQueryParamsMap.entrySet()) {
+        String storageGroupName = entry.getKey();
+        if (!dataPartitionMap.containsKey(storageGroupName)) {
+          dataPartitionMap.put(storageGroupName, new HashMap<>());
+        }
+        Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+            seriesSlotToTimePartitionMap = dataPartitionMap.get(storageGroupName);
+        // check cache for each query param
+        for (DataPartitionQueryParam dataPartitionQueryParam : entry.getValue()) {
+          if (null == dataPartitionQueryParam.getTimePartitionSlotList()
+              || 0 == dataPartitionQueryParam.getTimePartitionSlotList().size()) {
+            // if query all data, cache miss
+            logger.debug("Failed to find data partition");
+            return null;
+          }
+          TSeriesPartitionSlot seriesPartitionSlot =
+              partitionExecutor.getSeriesPartitionSlot(dataPartitionQueryParam.getDevicePath());
+          if (!seriesSlotToTimePartitionMap.containsKey(seriesPartitionSlot)) {
+            seriesSlotToTimePartitionMap.put(seriesPartitionSlot, new HashMap<>());
+          }
+          Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotListMap =
+              seriesSlotToTimePartitionMap.get(seriesPartitionSlot);
+          // check cache for each time partition
+          for (TTimePartitionSlot timePartitionSlot :
+              dataPartitionQueryParam.getTimePartitionSlotList()) {
+            DataPartitionCacheKey dataPartitionCacheKey =
+                new DataPartitionCacheKey(seriesPartitionSlot, timePartitionSlot);
+            List<TRegionReplicaSet> regionReplicaSets =
+                dataPartitionCache.getIfPresent(dataPartitionCacheKey);
+            if (null == regionReplicaSets) {
+              // if one time partition not find, cache miss
+              logger.debug("Failed to find data partition");
+              return null;
+            }
+            timePartitionSlotListMap.put(timePartitionSlot, regionReplicaSets);
+          }
+        }
+      }
+      logger.debug("Hit data partition");
+      // cache hit
+      return new DataPartition(dataPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
+    }
+
+    /** update schemaPartitionCache by schemaPartition. */
+    public void updateSchemaPartitionCache(List<String> devices, SchemaPartition schemaPartition) {
+      Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> storageGroupPartitionMap =
+          schemaPartition.getSchemaPartitionMap();
+      Set<String> storageGroupNames = storageGroupPartitionMap.keySet();
+      for (String device : devices) {
+        String storageGroup = null;
+        for (String storageGroupName : storageGroupNames) {
+          if (device.startsWith(storageGroupName)) {
+            storageGroup = storageGroupName;
+            break;
+          }
+        }
+        if (null == storageGroup) {
+          logger.error(
+              "Failed to get the storage group of {} when update SchemaPartitionCache", device);
+          continue;
+        }
+        TSeriesPartitionSlot seriesPartitionSlot = partitionExecutor.getSeriesPartitionSlot(device);
+        TRegionReplicaSet regionReplicaSet =
+            storageGroupPartitionMap.get(storageGroup).getOrDefault(seriesPartitionSlot, null);
+        if (null == regionReplicaSet) {
+          logger.error(
+              "Failed to get the regionReplicaSet of {} when update SchemaPartitionCache", device);
+          continue;
+        }
+        schemaPartitionCache.put(device, regionReplicaSet);
+      }
+    }
+
+    /** update dataPartitionCache by dataPartition */
+    public void updateDataPartitionCache(DataPartition dataPartition) {
+      for (Map.Entry<
+              String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+          entry1 : dataPartition.getDataPartitionMap().entrySet()) {
+        String storageGroup = entry1.getKey();
+        for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+            entry2 : entry1.getValue().entrySet()) {
+          TSeriesPartitionSlot seriesPartitionSlot = entry2.getKey();
+          for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> entry3 :
+              entry2.getValue().entrySet()) {
+            DataPartitionCacheKey dataPartitionCacheKey =
+                new DataPartitionCacheKey(seriesPartitionSlot, entry3.getKey());
+            dataPartitionCache.put(dataPartitionCacheKey, entry3.getValue());
+          }
+        }
+      }
+    }
+
+    /** invalid schemaPartitionCache by device */
+    public void invalidSchemaPartitionCache(String device) {
+      // TODO should be called in two situation: 1. redirect status 2. config node trigger
+      schemaPartitionCache.invalidate(device);
+    }
+
+    /** invalid dataPartitionCache by seriesPartitionSlot, timePartitionSlot */
+    public void invalidDataPartitionCache(
+        TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) {
+      // TODO should be called in two situation: 1. redirect status 2. config node trigger
+      DataPartitionCacheKey dataPartitionCacheKey =
+          new DataPartitionCacheKey(seriesPartitionSlot, timePartitionSlot);
+      dataPartitionCache.invalidate(dataPartitionCacheKey);
+    }
+  }
+
+  private class DataPartitionCacheKey {
+    private TSeriesPartitionSlot seriesPartitionSlot;
+    private TTimePartitionSlot timePartitionSlot;
+
+    public DataPartitionCacheKey(
+        TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) {
+      this.seriesPartitionSlot = seriesPartitionSlot;
+      this.timePartitionSlot = timePartitionSlot;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      DataPartitionCacheKey that = (DataPartitionCacheKey) o;
+      return Objects.equals(seriesPartitionSlot, that.seriesPartitionSlot)
+          && Objects.equals(timePartitionSlot, that.timePartitionSlot);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(seriesPartitionSlot, timePartitionSlot);
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
index eb0d8bec34..a60acbb810 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
@@ -139,9 +139,20 @@ public class FakePartitionFetcherImpl implements IPartitionFetcher {
     return dataPartition;
   }
 
+  @Override
+  public DataPartition getDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams) {
+    return null;
+  }
+
   @Override
   public DataPartition getOrCreateDataPartition(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
     return null;
   }
+
+  @Override
+  public DataPartition getOrCreateDataPartition(
+      List<DataPartitionQueryParam> dataPartitionQueryParams) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
index aa00f68f0b..c49d3ea28c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
@@ -34,6 +34,10 @@ public interface IPartitionFetcher {
 
   DataPartition getDataPartition(Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap);
 
+  DataPartition getDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams);
+
   DataPartition getOrCreateDataPartition(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap);
+
+  DataPartition getOrCreateDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
index ceacc51b6c..9cc05807b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
@@ -108,9 +108,20 @@ public class StandalonePartitionFetcher implements IPartitionFetcher {
     }
   }
 
+  @Override
+  public DataPartition getDataPartition(List<DataPartitionQueryParam> dataPartitionQueryParams) {
+    return null;
+  }
+
   @Override
   public DataPartition getOrCreateDataPartition(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
     return getDataPartition(sgNameToQueryParamsMap);
   }
+
+  @Override
+  public DataPartition getOrCreateDataPartition(
+      List<DataPartitionQueryParam> dataPartitionQueryParams) {
+    return null;
+  }
 }