You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/19 01:18:53 UTC

[iotdb] branch master updated: [IOTDB-4815] Apply SchemaCache for explicit timeseries query

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bf49535e88 [IOTDB-4815] Apply SchemaCache for explicit timeseries query
bf49535e88 is described below

commit bf49535e88090cf9471fbda5ea22f499604974b7
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Sat Nov 19 09:18:46 2022 +0800

    [IOTDB-4815] Apply SchemaCache for explicit timeseries query
---
 .../db/metadata/cache/DataNodeSchemaCache.java     |  47 +++++--
 .../iotdb/db/metadata/cache/SchemaCacheEntry.java  |  12 +-
 .../mpp/common/schematree/ClusterSchemaTree.java   |  23 ++--
 .../common/schematree/DeviceGroupSchemaTree.java   |   7 +-
 .../db/mpp/common/schematree/ISchemaTree.java      |   7 +-
 .../execution/executor/RegionWriteExecutor.java    |  11 +-
 .../process/last/UpdateLastCacheOperator.java      |  15 ++-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |   9 +-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  | 149 +++++++++++++++------
 .../mpp/plan/analyze/StandaloneSchemaFetcher.java  |   2 +-
 .../db/metadata/cache/DataNodeSchemaCacheTest.java |   7 +-
 .../mpp/execution/operator/OperatorMemoryTest.java |   2 +-
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |   2 +-
 .../iotdb/db/mpp/plan/plan/distribution/Util.java  |   2 +-
 14 files changed, 210 insertions(+), 85 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index bec73aeda7..c5614e34eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -34,6 +34,9 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
@@ -100,34 +103,60 @@ public class DataNodeSchemaCache {
    */
   public ClusterSchemaTree get(PartialPath devicePath, String[] measurements) {
     ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+    Set<String> storageGroupSet = new HashSet<>();
     SchemaCacheEntry schemaCacheEntry;
     for (String measurement : measurements) {
       PartialPath path = devicePath.concatNode(measurement);
       schemaCacheEntry = cache.getIfPresent(path);
       if (schemaCacheEntry != null) {
         schemaTree.appendSingleMeasurement(
-            devicePath.concatNode(
-                schemaCacheEntry.getSchemaEntryId()), // the cached path may be alias path
+            devicePath.concatNode(schemaCacheEntry.getSchemaEntryId()),
             schemaCacheEntry.getMeasurementSchema(),
             schemaCacheEntry.getTagMap(),
             null,
             schemaCacheEntry.isAligned());
+        storageGroupSet.add(schemaCacheEntry.getStorageGroup());
       }
     }
+    schemaTree.setDatabases(storageGroupSet);
+    return schemaTree;
+  }
+
+  public ClusterSchemaTree get(PartialPath fullPath) {
+    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+    SchemaCacheEntry schemaCacheEntry = cache.getIfPresent(fullPath);
+    if (schemaCacheEntry != null) {
+      schemaTree.appendSingleMeasurement(
+          fullPath,
+          schemaCacheEntry.getMeasurementSchema(),
+          schemaCacheEntry.getTagMap(),
+          null,
+          schemaCacheEntry.isAligned());
+      schemaTree.setDatabases(Collections.singleton(schemaCacheEntry.getStorageGroup()));
+    }
     return schemaTree;
   }
 
   public void put(ISchemaTree schemaTree) {
     for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
-      SchemaCacheEntry schemaCacheEntry =
-          new SchemaCacheEntry(
-              (MeasurementSchema) measurementPath.getMeasurementSchema(),
-              measurementPath.getTagMap(),
-              measurementPath.isUnderAlignedEntity());
-      cache.put(new PartialPath(measurementPath.getNodes()), schemaCacheEntry);
+      putSingleMeasurementPath(schemaTree.getBelongedDatabase(measurementPath), measurementPath);
     }
   }
 
+  public void put(String storageGroup, MeasurementPath measurementPath) {
+    putSingleMeasurementPath(storageGroup, measurementPath);
+  }
+
+  private void putSingleMeasurementPath(String storageGroup, MeasurementPath measurementPath) {
+    SchemaCacheEntry schemaCacheEntry =
+        new SchemaCacheEntry(
+            storageGroup,
+            (MeasurementSchema) measurementPath.getMeasurementSchema(),
+            measurementPath.getTagMap(),
+            measurementPath.isUnderAlignedEntity());
+    cache.put(new PartialPath(measurementPath.getNodes()), schemaCacheEntry);
+  }
+
   public TimeValuePair getLastCache(PartialPath seriesPath) {
     SchemaCacheEntry entry = cache.getIfPresent(seriesPath);
     if (null == entry) {
@@ -157,6 +186,7 @@ public class DataNodeSchemaCache {
    * aligned sensor without only one sub sensor
    */
   public void updateLastCache(
+      String storageGroup,
       MeasurementPath measurementPath,
       TimeValuePair timeValuePair,
       boolean highPriorityUpdate,
@@ -169,6 +199,7 @@ public class DataNodeSchemaCache {
         if (null == entry) {
           entry =
               new SchemaCacheEntry(
+                  storageGroup,
                   (MeasurementSchema) measurementPath.getMeasurementSchema(),
                   measurementPath.getTagMap(),
                   measurementPath.isUnderAlignedEntity());
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
index 9b0fa929d7..66c49e3dc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -28,6 +28,8 @@ import java.util.Map;
 
 public class SchemaCacheEntry {
 
+  private final String storageGroup;
+
   private final MeasurementSchema measurementSchema;
 
   private final Map<String, String> tagMap;
@@ -36,7 +38,11 @@ public class SchemaCacheEntry {
   private volatile ILastCacheContainer lastCacheContainer = null;
 
   SchemaCacheEntry(
-      MeasurementSchema measurementSchema, Map<String, String> tagMap, boolean isAligned) {
+      String storageGroup,
+      MeasurementSchema measurementSchema,
+      Map<String, String> tagMap,
+      boolean isAligned) {
+    this.storageGroup = storageGroup.intern();
     this.measurementSchema = measurementSchema;
     this.isAligned = isAligned;
     this.tagMap = tagMap;
@@ -46,6 +52,10 @@ public class SchemaCacheEntry {
     return measurementSchema.getMeasurementId();
   }
 
+  public String getStorageGroup() {
+    return storageGroup;
+  }
+
   public MeasurementSchema getMeasurementSchema() {
     return measurementSchema;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index 1e52edb485..d1581176ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
 import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_MATCH_PATTERN;
@@ -50,7 +51,7 @@ import static org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode.SCHEMA_M
 
 public class ClusterSchemaTree implements ISchemaTree {
 
-  private List<String> storageGroups;
+  private Set<String> databases;
 
   private final SchemaNode root;
 
@@ -292,27 +293,27 @@ public class ClusterSchemaTree implements ISchemaTree {
    * @return database in the given path
    */
   @Override
-  public String getBelongedStorageGroup(String pathName) {
-    for (String storageGroup : storageGroups) {
-      if (PathUtils.isStartWith(pathName, storageGroup)) {
-        return storageGroup;
+  public String getBelongedDatabase(String pathName) {
+    for (String database : databases) {
+      if (PathUtils.isStartWith(pathName, database)) {
+        return database;
       }
     }
     throw new RuntimeException("No matched database. Please check the path " + pathName);
   }
 
   @Override
-  public String getBelongedStorageGroup(PartialPath path) {
-    return getBelongedStorageGroup(path.getFullPath());
+  public String getBelongedDatabase(PartialPath path) {
+    return getBelongedDatabase(path.getFullPath());
   }
 
   @Override
-  public List<String> getStorageGroups() {
-    return storageGroups;
+  public Set<String> getDatabases() {
+    return databases;
   }
 
-  public void setStorageGroups(List<String> storageGroups) {
-    this.storageGroups = storageGroups;
+  public void setDatabases(Set<String> databases) {
+    this.databases = databases;
   }
 
   @TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java
index 1c739ea9fe..a8a64f38f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceGroupSchemaTree.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * This class is specifically for standalone schema validation during data insertion. Since the
@@ -82,17 +83,17 @@ public class DeviceGroupSchemaTree implements ISchemaTree {
   }
 
   @Override
-  public String getBelongedStorageGroup(String pathName) {
+  public String getBelongedDatabase(String pathName) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public String getBelongedStorageGroup(PartialPath path) {
+  public String getBelongedDatabase(PartialPath path) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public List<String> getStorageGroups() {
+  public Set<String> getDatabases() {
     throw new UnsupportedOperationException();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ISchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ISchemaTree.java
index acd383286a..010443a63a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ISchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ISchemaTree.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.List;
+import java.util.Set;
 
 public interface ISchemaTree {
   /**
@@ -60,11 +61,11 @@ public interface ISchemaTree {
    * @param pathName only full path, cannot be path pattern
    * @return database in the given path
    */
-  String getBelongedStorageGroup(String pathName);
+  String getBelongedDatabase(String pathName);
 
-  String getBelongedStorageGroup(PartialPath path);
+  String getBelongedDatabase(PartialPath path);
 
-  List<String> getStorageGroups();
+  Set<String> getDatabases();
 
   boolean isEmpty();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index e63309b26c..0d1d7804c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
@@ -32,6 +33,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
+import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
@@ -423,7 +425,10 @@ public class RegionWriteExecutor {
                   failingMeasurement.getValue().getMessage());
               alreadyExistingStatus.add(
                   RpcUtils.getStatus(
-                      metadataException.getErrorCode(), metadataException.getMessage()));
+                      metadataException.getErrorCode(),
+                      MeasurementPath.transformDataToString(
+                          ((MeasurementAlreadyExistException) metadataException)
+                              .getMeasurementPath())));
             } else {
               LOGGER.error("Metadata error: ", metadataException);
               failingStatus.add(
@@ -469,9 +474,9 @@ public class RegionWriteExecutor {
 
           TSStatus status;
           if (failingStatus.isEmpty()) {
-            status = RpcUtils.getStatus(failingStatus);
-          } else {
             status = RpcUtils.getStatus(alreadyExistingStatus);
+          } else {
+            status = RpcUtils.getStatus(failingStatus);
           }
 
           RegionExecutionResult result = new RegionExecutionResult();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index abd43eb5e0..8afd9b99de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
@@ -58,6 +59,8 @@ public class UpdateLastCacheOperator implements ProcessOperator {
 
   private final TsBlockBuilder tsBlockBuilder;
 
+  private String databaseName;
+
   public UpdateLastCacheOperator(
       OperatorContext operatorContext,
       Operator child,
@@ -106,7 +109,7 @@ public class UpdateLastCacheOperator implements ProcessOperator {
 
     if (needUpdateCache) {
       TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
-      lastCache.updateLastCache(fullPath, timeValuePair, false, Long.MIN_VALUE);
+      lastCache.updateLastCache(getDatabaseName(), fullPath, timeValuePair, false, Long.MIN_VALUE);
     }
 
     tsBlockBuilder.reset();
@@ -117,6 +120,16 @@ public class UpdateLastCacheOperator implements ProcessOperator {
     return tsBlockBuilder.build();
   }
 
+  private String getDatabaseName() {
+    if (databaseName == null) {
+      databaseName =
+          ((DataDriverContext) operatorContext.getInstanceContext().getDriverContext())
+              .getDataRegion()
+              .getStorageGroupName();
+    }
+    return databaseName;
+  }
+
   @Override
   public boolean hasNext() {
     return child.hasNext();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 7028955335..19169af3b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -359,7 +359,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
       queryParam.setDevicePath(devicePath);
       sgNameToQueryParamsMap
-          .computeIfAbsent(schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
+          .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
           .add(queryParam);
     }
     DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
@@ -1104,7 +1104,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
       queryParam.setDevicePath(devicePath);
       sgNameToQueryParamsMap
-          .computeIfAbsent(schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
+          .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
           .add(queryParam);
     }
     return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
@@ -1914,8 +1914,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
         DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
         queryParam.setDevicePath(devicePath);
         sgNameToQueryParamsMap
-            .computeIfAbsent(
-                schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
+            .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
             .add(queryParam);
       }
       DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
@@ -2152,7 +2151,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
               queryParam.setDevicePath(devicePath.getFullPath());
               sgNameToQueryParamsMap
                   .computeIfAbsent(
-                      schemaTree.getBelongedStorageGroup(devicePath), key -> new ArrayList<>())
+                      schemaTree.getBelongedDatabase(devicePath), key -> new ArrayList<>())
                   .add(queryParam);
             });
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 31cc8c8deb..16a49ae47a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -106,10 +106,69 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
   private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean withTags) {
     Map<Integer, Template> templateMap = new HashMap<>();
     patternTree.constructTree();
-    for (PartialPath pattern : patternTree.getAllPathPatterns()) {
+    List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+    for (PartialPath pattern : pathPatternList) {
       templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
     }
-    return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+
+    if (withTags) {
+      return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+    }
+
+    List<PartialPath> fullPathList = new ArrayList<>();
+    for (PartialPath pattern : pathPatternList) {
+      if (!pattern.hasWildcard()) {
+        fullPathList.add(pattern);
+      }
+    }
+
+    if (fullPathList.isEmpty()) {
+      return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+    }
+
+    // The schema cache R/W and fetch operation must be locked together thus the cache clean
+    // operation executed by delete timeseries will be effective.
+    schemaCache.takeReadLock();
+    try {
+      ClusterSchemaTree schemaTree;
+      if (fullPathList.size() == pathPatternList.size()) {
+        boolean isAllCached = true;
+        schemaTree = new ClusterSchemaTree();
+        ClusterSchemaTree cachedSchema;
+        Set<String> storageGroupSet = new HashSet<>();
+        for (PartialPath fullPath : fullPathList) {
+          cachedSchema = schemaCache.get(fullPath);
+          if (cachedSchema.isEmpty()) {
+            isAllCached = false;
+            break;
+          } else {
+            schemaTree.mergeSchemaTree(cachedSchema);
+            storageGroupSet.addAll(cachedSchema.getDatabases());
+          }
+        }
+        if (isAllCached) {
+          schemaTree.setDatabases(storageGroupSet);
+          return schemaTree;
+        }
+      }
+
+      schemaTree =
+          executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+
+      // only cache the schema fetched by full path
+      List<MeasurementPath> measurementPathList;
+      for (PartialPath fullPath : fullPathList) {
+        measurementPathList = schemaTree.searchMeasurementPaths(fullPath).left;
+        if (measurementPathList.isEmpty()) {
+          continue;
+        }
+        schemaCache.put(
+            schemaTree.getBelongedDatabase(measurementPathList.get(0)), measurementPathList.get(0));
+      }
+      return schemaTree;
+    } finally {
+      schemaCache.releaseReadLock();
+    }
   }
 
   private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
@@ -132,7 +191,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       }
       try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
         ClusterSchemaTree result = new ClusterSchemaTree();
-        List<String> storageGroupList = new ArrayList<>();
+        Set<String> databaseSet = new HashSet<>();
         while (coordinator.getQueryExecution(queryId).hasNextResult()) {
           // The query will be transited to FINISHED when invoking getBatchResult() at the last time
           // So we don't need to clean up it manually
@@ -147,10 +206,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
           }
           Column column = tsBlock.get().getColumn(0);
           for (int i = 0; i < column.getPositionCount(); i++) {
-            parseFetchedData(column.getBinary(i), result, storageGroupList);
+            parseFetchedData(column.getBinary(i), result, databaseSet);
           }
         }
-        result.setStorageGroups(storageGroupList);
+        result.setDatabases(databaseSet);
         return result;
       }
     } finally {
@@ -159,14 +218,14 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
   }
 
   private void parseFetchedData(
-      Binary data, ClusterSchemaTree resultSchemaTree, List<String> storageGroupList) {
+      Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
     InputStream inputStream = new ByteArrayInputStream(data.getValues());
     try {
       byte type = ReadWriteIOUtils.readByte(inputStream);
       if (type == 0) {
         int size = ReadWriteIOUtils.readInt(inputStream);
         for (int i = 0; i < size; i++) {
-          storageGroupList.add(ReadWriteIOUtils.readString(inputStream));
+          databaseSet.add(ReadWriteIOUtils.readString(inputStream));
         }
       } else if (type == 1) {
         resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
@@ -185,21 +244,24 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       String[] measurements,
       Function<Integer, TSDataType> getDataType,
       boolean isAligned) {
+    // The schema cache R/W and fetch operation must be locked together thus the cache clean
+    // operation executed by delete timeseries will be effective.
     schemaCache.takeReadLock();
     try {
       ClusterSchemaTree schemaTree = schemaCache.get(devicePath, measurements);
       List<Integer> indexOfMissingMeasurements =
           checkMissingMeasurements(schemaTree, devicePath, measurements);
 
+      // all schema can be taken from cache
       if (indexOfMissingMeasurements.isEmpty()) {
         return schemaTree;
       }
 
+      // try fetch the missing schema from remote and cache fetched schema
       PathPatternTree patternTree = new PathPatternTree();
       for (int index : indexOfMissingMeasurements) {
         patternTree.appendFullPath(devicePath, measurements[index]);
       }
-
       ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
       if (!remoteSchemaTree.isEmpty()) {
         schemaTree.mergeSchemaTree(remoteSchemaTree);
@@ -210,19 +272,16 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         return schemaTree;
       }
 
-      ClusterSchemaTree missingSchemaTree =
-          checkAndAutoCreateMissingMeasurements(
-              remoteSchemaTree,
-              devicePath,
-              indexOfMissingMeasurements,
-              measurements,
-              getDataType,
-              null,
-              null,
-              isAligned);
-
-      schemaTree.mergeSchemaTree(missingSchemaTree);
-      schemaCache.put(missingSchemaTree);
+      // auto create the still missing schema and merge them into schemaTree
+      checkAndAutoCreateMissingMeasurements(
+          schemaTree,
+          devicePath,
+          indexOfMissingMeasurements,
+          measurements,
+          getDataType,
+          null,
+          null,
+          isAligned);
 
       return schemaTree;
     } finally {
@@ -248,8 +307,9 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       List<TSEncoding[]> encodingsList,
       List<CompressionType[]> compressionTypesList,
       List<Boolean> isAlignedList) {
+    // The schema cache R/W and fetch operation must be locked together thus the cache clean
+    // operation executed by delete timeseries will be effective.
     schemaCache.takeReadLock();
-
     try {
       ClusterSchemaTree schemaTree = new ClusterSchemaTree();
       PathPatternTree patternTree = new PathPatternTree();
@@ -264,10 +324,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         }
       }
 
+      // all schema can be taken from cache
       if (patternTree.isEmpty()) {
         return schemaTree;
       }
 
+      // try fetch the missing schema from remote and cache fetched schema
       ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
       if (!remoteSchemaTree.isEmpty()) {
         schemaTree.mergeSchemaTree(remoteSchemaTree);
@@ -278,21 +340,18 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         return schemaTree;
       }
 
-      ClusterSchemaTree missingSchemaTree;
+      // auto create the still missing schema and merge them into schemaTree
       for (int i = 0; i < devicePathList.size(); i++) {
         int finalI = i;
-        missingSchemaTree =
-            checkAndAutoCreateMissingMeasurements(
-                schemaTree,
-                devicePathList.get(i),
-                indexOfMissingMeasurementsList.get(i),
-                measurementsList.get(i),
-                index -> tsDataTypesList.get(finalI)[index],
-                encodingsList == null ? null : encodingsList.get(i),
-                compressionTypesList == null ? null : compressionTypesList.get(i),
-                isAlignedList.get(i));
-        schemaTree.mergeSchemaTree(missingSchemaTree);
-        schemaCache.put(missingSchemaTree);
+        checkAndAutoCreateMissingMeasurements(
+            schemaTree,
+            devicePathList.get(i),
+            indexOfMissingMeasurementsList.get(i),
+            measurementsList.get(i),
+            index -> tsDataTypesList.get(finalI)[index],
+            encodingsList == null ? null : encodingsList.get(i),
+            compressionTypesList == null ? null : compressionTypesList.get(i),
+            isAlignedList.get(i));
       }
       return schemaTree;
     } finally {
@@ -315,7 +374,9 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     return templateManager.getAllPathsSetTemplate(templateName);
   }
 
-  private ClusterSchemaTree checkAndAutoCreateMissingMeasurements(
+  // check which measurements are missing and auto create the missing measurements and merge them
+  // into given schemaTree
+  private void checkAndAutoCreateMissingMeasurements(
       ClusterSchemaTree schemaTree,
       PartialPath devicePath,
       List<Integer> indexOfMissingMeasurements,
@@ -324,6 +385,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       TSEncoding[] encodings,
       CompressionType[] compressionTypes,
       boolean isAligned) {
+    // check missing measurements
     DeviceSchemaInfo deviceSchemaInfo =
         schemaTree.searchDeviceSchemaInfo(
             devicePath,
@@ -340,13 +402,11 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         }
       }
     }
-
-    ClusterSchemaTree reFetchedSchemaTree = new ClusterSchemaTree();
-
     if (indexOfMissingMeasurements.isEmpty()) {
-      return reFetchedSchemaTree;
+      return;
     }
 
+    // check whether there is template should be activated
     Pair<Template, PartialPath> templateInfo = templateManager.checkTemplateSetInfo(devicePath);
     if (templateInfo != null) {
       Template template = templateInfo.left;
@@ -377,11 +437,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         }
 
         if (indexOfMissingMeasurements.isEmpty()) {
-          return schemaTree;
+          return;
         }
       }
     }
 
+    // auto create the rest missing timeseries
     List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size());
     List<TSDataType> dataTypesOfMissingMeasurement =
         new ArrayList<>(indexOfMissingMeasurements.size());
@@ -416,7 +477,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
               compressionTypesOfMissingMeasurement,
               isAligned));
     }
-    return schemaTree;
   }
 
   private List<Integer> checkMissingMeasurements(
@@ -438,6 +498,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     return indexOfMissingMeasurements;
   }
 
+  // try to create the target timeseries and return schemaTree involving successfully created
+  // timeseries and existing timeseries
   private ClusterSchemaTree internalCreateTimeseries(
       PartialPath devicePath,
       List<String> measurements,
@@ -472,11 +534,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
           isAligned);
     }
 
-    schemaCache.put(schemaTree);
-
     return schemaTree;
   }
 
+  // auto create timeseries and return the existing timeseries info
   private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
       InternalCreateTimeSeriesStatement statement) {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index 72f10eaa87..f2ab109b79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -88,7 +88,7 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
     } catch (MetadataException e) {
       throw new RuntimeException(e);
     }
-    schemaTree.setStorageGroups(new ArrayList<>(storageGroupSet));
+    schemaTree.setDatabases(storageGroupSet);
     return schemaTree;
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index a581cdd867..bee7c16e6b 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -70,6 +71,7 @@ public class DataNodeSchemaCacheTest {
                     o -> new PartialPath(o.getNodes()),
                     o ->
                         new SchemaCacheEntry(
+                            "root.sg1",
                             (MeasurementSchema) o.getMeasurementSchema(),
                             o.getTagMap(),
                             o.isUnderAlignedEntity())));
@@ -102,6 +104,7 @@ public class DataNodeSchemaCacheTest {
                     o -> new PartialPath(o.getNodes()),
                     o ->
                         new SchemaCacheEntry(
+                            "root.sg1",
                             (MeasurementSchema) o.getMeasurementSchema(),
                             o.getTagMap(),
                             o.isUnderAlignedEntity())));
@@ -209,7 +212,7 @@ public class DataNodeSchemaCacheTest {
         null,
         null,
         false);
-
+    schemaTree.setDatabases(Collections.singleton("root.sg1"));
     return schemaTree;
   }
 
@@ -234,7 +237,7 @@ public class DataNodeSchemaCacheTest {
         null,
         null,
         false);
-
+    schemaTree.setDatabases(Collections.singleton("root.sg1"));
     return schemaTree;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index c1d7728804..9a3f34b3a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -492,7 +492,7 @@ public class OperatorMemoryTest {
     Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
 
     UpdateLastCacheOperator updateLastCacheOperator =
-        new UpdateLastCacheOperator(null, child, null, TSDataType.BOOLEAN, null, true);
+        new UpdateLastCacheOperator(null, child, null, TSDataType.BOOLEAN, null, false);
 
     assertEquals(2048, updateLastCacheOperator.calculateMaxPeekMemory());
     assertEquals(1024, updateLastCacheOperator.calculateMaxReturnSize());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 1a121354ca..61951feee4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -45,7 +45,7 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
 
   @Override
   public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
-    schemaTree.setStorageGroups(Collections.singletonList("root.sg"));
+    schemaTree.setDatabases(Collections.singleton("root.sg"));
     return schemaTree;
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index e97d6340cd..654e987931 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -275,7 +275,7 @@ public class Util {
     d6.addChild("s2", s2);
 
     ClusterSchemaTree tree = new ClusterSchemaTree(root);
-    tree.setStorageGroups(Collections.singletonList("root.sg"));
+    tree.setDatabases(Collections.singleton("root.sg"));
 
     return tree;
   }