You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/04/26 02:03:12 UTC

[iotdb] branch master updated: Support update last cache for data insertion when using template (#9696)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b6f78dcee2 Support update last cache for data insertion when using template (#9696)
b6f78dcee2 is described below

commit b6f78dcee2f79eceb7a224102c17d125de7d98a6
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Wed Apr 26 10:03:05 2023 +0800

    Support update last cache for data insertion when using template (#9696)
---
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  62 +++++++------
 .../db/metadata/cache/DataNodeSchemaCache.java     |  24 +++++
 .../db/metadata/cache/TimeSeriesSchemaCache.java   |  57 ++++++++++++
 .../metadata/cache/dualkeycache/IDualKeyCache.java |   4 +
 .../dualkeycache/impl/CacheEntryGroupImpl.java     |   2 +-
 .../cache/dualkeycache/impl/DualKeyCacheImpl.java  | 102 ++++++++-------------
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |   5 -
 .../db/mpp/plan/analyze/schema/ISchemaFetcher.java |   2 -
 .../impl/DataNodeInternalRPCServiceImpl.java       |   7 +-
 .../db/metadata/cache/DataNodeSchemaCacheTest.java |  50 ++++++++++
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |   3 -
 .../iotdb/db/mpp/plan/plan/distribution/Util.java  |   3 -
 12 files changed, 213 insertions(+), 108 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 56f3a5f5aa..bcb4078187 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1149,19 +1149,17 @@ public class DataRegion implements IDataRegionForQuery {
     if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
       return;
     }
-    for (int i = 0; i < node.getColumns().length; i++) {
-      if (node.getColumns()[i] == null) {
-        continue;
-      }
-      // Update cached last value with high priority
-      DataNodeSchemaCache.getInstance()
-          .updateLastCache(
-              node.getDevicePath(),
-              node.getMeasurements()[i],
-              node.composeLastTimeValuePair(i),
-              true,
-              latestFlushedTime);
-    }
+    DataNodeSchemaCache.getInstance()
+        .updateLastCache(
+            getDatabaseName(),
+            node.getDevicePath(),
+            node.getMeasurements(),
+            node.getMeasurementSchemas(),
+            node.isAligned(),
+            node::composeLastTimeValuePair,
+            index -> node.getColumns()[index] != null,
+            true,
+            latestFlushedTime);
   }
 
   private void insertToTsFileProcessor(
@@ -1191,19 +1189,17 @@ public class DataRegion implements IDataRegionForQuery {
     if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
       return;
     }
-    for (int i = 0; i < node.getValues().length; i++) {
-      if (node.getValues()[i] == null) {
-        continue;
-      }
-      // Update cached last value with high priority
-      DataNodeSchemaCache.getInstance()
-          .updateLastCache(
-              node.getDevicePath(),
-              node.getMeasurements()[i],
-              node.composeTimeValuePair(i),
-              true,
-              latestFlushedTime);
-    }
+    DataNodeSchemaCache.getInstance()
+        .updateLastCache(
+            getDatabaseName(),
+            node.getDevicePath(),
+            node.getMeasurements(),
+            node.getMeasurementSchemas(),
+            node.isAligned(),
+            node::composeTimeValuePair,
+            index -> node.getValues()[index] != null,
+            true,
+            latestFlushedTime);
   }
 
   /**
@@ -1903,7 +1899,12 @@ public class DataRegion implements IDataRegionForQuery {
 
       // delete Last cache record if necessary
       // todo implement more precise process
-      DataNodeSchemaCache.getInstance().invalidateAll();
+      DataNodeSchemaCache.getInstance().takeWriteLock();
+      try {
+        DataNodeSchemaCache.getInstance().invalidateAll();
+      } finally {
+        DataNodeSchemaCache.getInstance().releaseWriteLock();
+      }
 
       // write log to impacted working TsFileProcessors
       List<WALFlushListener> walListeners =
@@ -2312,7 +2313,12 @@ public class DataRegion implements IDataRegionForQuery {
     if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
       return;
     }
-    DataNodeSchemaCache.getInstance().invalidateAll();
+    DataNodeSchemaCache.getInstance().takeWriteLock();
+    try {
+      DataNodeSchemaCache.getInstance().invalidateAll();
+    } finally {
+      DataNodeSchemaCache.getInstance().releaseWriteLock();
+    }
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 5b5d3d5865..a9a981088c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaComputation;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
 
 /**
  * This class takes the responsibility of metadata cache management of all DataRegions under
@@ -173,6 +175,28 @@ public class DataNodeSchemaCache {
         devicePath, measurement, timeValuePair, highPriorityUpdate, latestFlushedTime);
   }
 
+  public void updateLastCache(
+      String database,
+      PartialPath devicePath,
+      String[] measurements,
+      MeasurementSchema[] measurementSchemas,
+      boolean isAligned,
+      Function<Integer, TimeValuePair> timeValuePairProvider,
+      Function<Integer, Boolean> shouldUpdateProvider,
+      boolean highPriorityUpdate,
+      Long latestFlushedTime) {
+    timeSeriesSchemaCache.updateLastCache(
+        database,
+        devicePath,
+        measurements,
+        measurementSchemas,
+        isAligned,
+        timeValuePairProvider,
+        shouldUpdateProvider,
+        highPriorityUpdate,
+        latestFlushedTime);
+  }
+
   /**
    * get or create SchemaCacheEntry and update last cache, only support non-aligned sensor or
    * aligned sensor without only one sub sensor
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
index 2fccef50d1..d87527e2b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
@@ -41,6 +41,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 
 public class TimeSeriesSchemaCache {
 
@@ -200,6 +201,62 @@ public class TimeSeriesSchemaCache {
         entry, timeValuePair, highPriorityUpdate, latestFlushedTime);
   }
 
+  /** get SchemaCacheEntry and update last cache by device */
+  public void updateLastCache(
+      String database,
+      PartialPath devicePath,
+      String[] measurements,
+      MeasurementSchema[] measurementSchemas,
+      boolean isAligned,
+      Function<Integer, TimeValuePair> timeValuePairProvider,
+      Function<Integer, Boolean> shouldUpdateProvider,
+      boolean highPriorityUpdate,
+      Long latestFlushedTime) {
+    SchemaCacheEntry entry;
+    List<Integer> missingMeasurements = new ArrayList<>();
+    dualKeyCache.compute(
+        new IDualKeyCacheComputation<PartialPath, String, SchemaCacheEntry>() {
+          @Override
+          public PartialPath getFirstKey() {
+            return devicePath;
+          }
+
+          @Override
+          public String[] getSecondKeyList() {
+            return measurements;
+          }
+
+          @Override
+          public void computeValue(int index, SchemaCacheEntry value) {
+            if (!shouldUpdateProvider.apply(index)) {
+              return;
+            }
+            if (value == null) {
+              missingMeasurements.add(index);
+            } else {
+              DataNodeLastCacheManager.updateLastCache(
+                  value, timeValuePairProvider.apply(index), highPriorityUpdate, latestFlushedTime);
+            }
+          }
+        });
+
+    for (int index : missingMeasurements) {
+      entry = dualKeyCache.get(devicePath, measurements[index]);
+      if (entry == null) {
+        synchronized (dualKeyCache) {
+          entry = dualKeyCache.get(devicePath, measurements[index]);
+          if (null == entry) {
+            entry = new SchemaCacheEntry(database, measurementSchemas[index], null, isAligned);
+            dualKeyCache.put(devicePath, measurements[index], entry);
+          }
+        }
+      }
+
+      DataNodeLastCacheManager.updateLastCache(
+          entry, timeValuePairProvider.apply(index), highPriorityUpdate, latestFlushedTime);
+    }
+  }
+
   /**
    * get or create SchemaCacheEntry and update last cache, only support non-aligned sensor or
    * aligned sensor without only one sub sensor
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
index 9f6fe21e6a..cb11dd6cb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/IDualKeyCache.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.metadata.cache.dualkeycache;
 
+import javax.annotation.concurrent.GuardedBy;
+
 /**
  * This interfaces defines the behaviour of a dual key cache. A dual key cache supports manage cache
  * values via two keys, first key and second key. Simply, the structure is like fk -> sk-> value.
@@ -45,11 +47,13 @@ public interface IDualKeyCache<FK, SK, V> {
    * Invalidate all cache values in the cache and clear related cache keys. The cache status and
    * statistics won't be clear and they can still be accessed via cache.stats().
    */
+  @GuardedBy("DataNodeSchemaCache#writeLock")
   void invalidateAll();
 
   /**
    * Clean up all data and info of this cache, including cache keys, cache values and cache stats.
    */
+  @GuardedBy("DataNodeSchemaCache#writeLock")
   void cleanUp();
 
   /** Return all the current cache status and statistics. */
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheEntryGroupImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheEntryGroupImpl.java
index 81aa331ae6..9b6b1b49ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheEntryGroupImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/CacheEntryGroupImpl.java
@@ -42,7 +42,7 @@ public class CacheEntryGroupImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
 
   @Override
   public T getCacheEntry(SK secondKey) {
-    return cacheEntryMap.get(secondKey);
+    return secondKey == null ? null : cacheEntryMap.get(secondKey);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
index 0704e75f7c..270f3ec131 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/dualkeycache/impl/DualKeyCacheImpl.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.metadata.cache.dualkeycache.IDualKeyCacheStats;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiFunction;
 
 class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
@@ -41,8 +40,6 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
 
   private final CacheStats cacheStats;
 
-  private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
-
   DualKeyCacheImpl(
       ICacheEntryManager<FK, SK, V, T> cacheEntryManager,
       ICacheSizeComputer<FK, SK, V> sizeComputer,
@@ -54,72 +51,57 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
 
   @Override
   public V get(FK firstKey, SK secondKey) {
-    readWriteLock.readLock().lock();
-    try {
-      ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
-      if (cacheEntryGroup == null) {
+    ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
+    if (cacheEntryGroup == null) {
+      cacheStats.recordMiss(1);
+      return null;
+    } else {
+      T cacheEntry = cacheEntryGroup.getCacheEntry(secondKey);
+      if (cacheEntry == null) {
         cacheStats.recordMiss(1);
         return null;
       } else {
-        T cacheEntry = cacheEntryGroup.getCacheEntry(secondKey);
-        if (cacheEntry == null) {
-          cacheStats.recordMiss(1);
-          return null;
-        } else {
-          cacheEntryManager.access(cacheEntry);
-          cacheStats.recordHit(1);
-          return cacheEntry.getValue();
-        }
+        cacheEntryManager.access(cacheEntry);
+        cacheStats.recordHit(1);
+        return cacheEntry.getValue();
       }
-    } finally {
-      readWriteLock.readLock().unlock();
     }
   }
 
   @Override
   public void compute(IDualKeyCacheComputation<FK, SK, V> computation) {
-    readWriteLock.readLock().lock();
-    try {
-      FK firstKey = computation.getFirstKey();
-      ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
-      SK[] secondKeyList = computation.getSecondKeyList();
-      if (cacheEntryGroup == null) {
-        for (int i = 0; i < secondKeyList.length; i++) {
+    FK firstKey = computation.getFirstKey();
+    ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
+    SK[] secondKeyList = computation.getSecondKeyList();
+    if (cacheEntryGroup == null) {
+      for (int i = 0; i < secondKeyList.length; i++) {
+        computation.computeValue(i, null);
+      }
+      cacheStats.recordMiss(secondKeyList.length);
+    } else {
+      T cacheEntry;
+      int hitCount = 0;
+      for (int i = 0; i < secondKeyList.length; i++) {
+        cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
+        if (cacheEntry == null) {
           computation.computeValue(i, null);
+        } else {
+          computation.computeValue(i, cacheEntry.getValue());
+          cacheEntryManager.access(cacheEntry);
+          hitCount++;
         }
-        cacheStats.recordMiss(secondKeyList.length);
-      } else {
-        T cacheEntry;
-        int hitCount = 0;
-        for (int i = 0; i < secondKeyList.length; i++) {
-          cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
-          if (cacheEntry == null) {
-            computation.computeValue(i, null);
-          } else {
-            computation.computeValue(i, cacheEntry.getValue());
-            cacheEntryManager.access(cacheEntry);
-            hitCount++;
-          }
-        }
-        cacheStats.recordHit(hitCount);
-        cacheStats.recordMiss(secondKeyList.length - hitCount);
       }
-    } finally {
-      readWriteLock.readLock().unlock();
+      cacheStats.recordHit(hitCount);
+      cacheStats.recordMiss(secondKeyList.length - hitCount);
     }
   }
 
   @Override
   public void put(FK firstKey, SK secondKey, V value) {
-    readWriteLock.readLock().lock();
-    try {
-      int usedMemorySize = putToCache(firstKey, secondKey, value);
-      cacheStats.increaseMemoryUsage(usedMemorySize);
-      if (cacheStats.isExceedMemoryCapacity()) {
-        executeCacheEviction(usedMemorySize);
-      }
-    } finally {
-      readWriteLock.readLock().unlock();
+    int usedMemorySize = putToCache(firstKey, secondKey, value);
+    cacheStats.increaseMemoryUsage(usedMemorySize);
+    if (cacheStats.isExceedMemoryCapacity()) {
+      executeCacheEviction(usedMemorySize);
     }
   }
 
@@ -206,12 +188,7 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
 
   @Override
   public void invalidateAll() {
-    readWriteLock.writeLock().lock();
-    try {
-      executeInvalidateAll();
-    } finally {
-      readWriteLock.writeLock().unlock();
-    }
+    executeInvalidateAll();
   }
 
   private void executeInvalidateAll() {
@@ -222,13 +199,8 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>>
 
   @Override
   public void cleanUp() {
-    readWriteLock.writeLock().lock();
-    try {
-      executeInvalidateAll();
-      cacheStats.reset();
-    } finally {
-      readWriteLock.writeLock().unlock();
-    }
+    executeInvalidateAll();
+    cacheStats.reset();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 5bac0a831f..53fa5ee128 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -379,9 +379,4 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
     return indexOfMissingMeasurements;
   }
-
-  @Override
-  public void invalidAllCache() {
-    schemaCache.invalidateAll();
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
index 450c4dc3b0..7a5403af91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
@@ -92,6 +92,4 @@ public interface ISchemaFetcher {
   Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern);
 
   Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName);
-
-  void invalidAllCache();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 6a6e9e1b3f..7f45d9b5e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1625,7 +1625,12 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     status.setMessage("disable datanode succeed");
     // TODO what need to clean?
     ClusterPartitionFetcher.getInstance().invalidAllCache();
-    DataNodeSchemaCache.getInstance().cleanUp();
+    DataNodeSchemaCache.getInstance().takeWriteLock();
+    try {
+      DataNodeSchemaCache.getInstance().cleanUp();
+    } finally {
+      DataNodeSchemaCache.getInstance().releaseWriteLock();
+    }
     DataNodeDevicePathCache.getInstance().cleanUp();
     return status;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index 0b5910d4df..5556cb33ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -233,4 +233,54 @@ public class DataNodeSchemaCacheTest {
     schemaTree.setDatabases(Collections.singleton("root.sg1"));
     return schemaTree;
   }
+
+  @Test
+  public void testUpdateLastCache() throws IllegalPathException {
+    String database = "root.db";
+    PartialPath device = new PartialPath("root.db.d");
+
+    String[] measurements = new String[] {"s1", "s2", "s3"};
+    MeasurementSchema[] measurementSchemas =
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.INT32),
+          new MeasurementSchema("s2", TSDataType.INT32),
+          new MeasurementSchema("s3", TSDataType.INT32)
+        };
+
+    dataNodeSchemaCache.updateLastCache(
+        database,
+        device,
+        measurements,
+        measurementSchemas,
+        true,
+        index -> new TimeValuePair(1, new TsPrimitiveType.TsInt(1)),
+        index -> index != 1,
+        true,
+        1L);
+
+    Assert.assertNotNull(dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s1")));
+    Assert.assertNull(dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s2")));
+    Assert.assertNotNull(dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s3")));
+
+    dataNodeSchemaCache.updateLastCache(
+        database,
+        device,
+        measurements,
+        measurementSchemas,
+        true,
+        index -> new TimeValuePair(2, new TsPrimitiveType.TsInt(2)),
+        index -> true,
+        true,
+        1L);
+
+    Assert.assertEquals(
+        new TimeValuePair(2, new TsPrimitiveType.TsInt(2)),
+        dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s1")));
+    Assert.assertEquals(
+        new TimeValuePair(2, new TsPrimitiveType.TsInt(2)),
+        dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s2")));
+    Assert.assertEquals(
+        new TimeValuePair(2, new TsPrimitiveType.TsInt(2)),
+        dataNodeSchemaCache.getLastCache(new PartialPath("root.db.d.s3")));
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 3a3b97cb3d..8cdfeeba81 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -147,7 +147,4 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
   public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) {
     return null;
   }
-
-  @Override
-  public void invalidAllCache() {}
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index 7123e2c389..a8aa156554 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -343,9 +343,6 @@ public class Util {
       public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) {
         return null;
       }
-
-      @Override
-      public void invalidAllCache() {}
     };
   }