You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/11 01:51:20 UTC

[iotdb] branch master updated: [IOTDB-2711] Fix memory allocation deadlock by concurrent snapshot requests (#5195)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d16cea4  [IOTDB-2711] Fix memory allocation deadlock by concurrent snapshot requests (#5195)
d16cea4 is described below

commit d16cea42783f17d2e87ef0320d4cb876652f443f
Author: BaiJian <er...@hotmail.com>
AuthorDate: Fri Mar 11 09:47:51 2022 +0800

    [IOTDB-2711] Fix memory allocation deadlock by concurrent snapshot requests (#5195)
---
 .../org/apache/iotdb/cluster/log/Snapshot.java     |   5 +
 .../cluster/log/snapshot/MetaSimpleSnapshot.java   |  89 +++++++++-------
 .../cluster/log/snapshot/PartitionedSnapshot.java  |  39 ++++---
 .../server/handlers/caller/HeartbeatHandler.java   |   6 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  13 ++-
 .../log/snapshot/MetaSimpleSnapshotTest.java       | 118 ++++++++++++++++++++-
 .../log/snapshot/PartitionedSnapshotTest.java      |  81 +++++++++++++-
 .../handlers/caller/HeartbeatHandlerTest.java      |  17 +++
 thrift-cluster/src/main/thrift/cluster.thrift      |   1 +
 9 files changed, 306 insertions(+), 63 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Snapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Snapshot.java
index 5b67a2f..b0c0761 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Snapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Snapshot.java
@@ -67,4 +67,9 @@ public abstract class Snapshot {
    * @param minIndex
    */
   public void truncateBefore(long minIndex) {}
+
+  @Override
+  public String toString() {
+    return String.format("%d-%d", lastLogIndex, lastLogTerm);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java
index adca4aa..1cc376e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java
@@ -48,6 +48,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.locks.Lock;
 
 /** MetaSimpleSnapshot also records all storage groups. */
 public class MetaSimpleSnapshot extends Snapshot {
@@ -218,53 +219,63 @@ public class MetaSimpleSnapshot extends Snapshot {
      * authentication info, and last log term/index in the snapshot.
      */
     private void installSnapshot(MetaSimpleSnapshot snapshot) {
-      synchronized (metaGroupMember.getSnapshotApplyLock()) {
-        // 1.  register all storage groups
-        for (Map.Entry<PartialPath, Long> entry : snapshot.getStorageGroupTTLMap().entrySet()) {
-          PartialPath sgPath = entry.getKey();
-          try {
-            IoTDB.metaManager.setStorageGroup(sgPath);
-          } catch (StorageGroupAlreadySetException e) {
-            // ignore
-          } catch (MetadataException e) {
-            logger.error(
-                "{}: Cannot add storage group {} in snapshot, errMessage:{}",
-                metaGroupMember.getName(),
-                entry.getKey(),
-                e.getMessage());
+      Lock lock = metaGroupMember.getSnapshotApplyLock();
+      if (lock.tryLock()) {
+        try {
+          // 1.  register all storage groups
+          for (Map.Entry<PartialPath, Long> entry : snapshot.getStorageGroupTTLMap().entrySet()) {
+            PartialPath sgPath = entry.getKey();
+            try {
+              IoTDB.metaManager.setStorageGroup(sgPath);
+            } catch (StorageGroupAlreadySetException e) {
+              // ignore
+            } catch (MetadataException e) {
+              logger.error(
+                  "{}: Cannot add storage group {} in snapshot, errMessage:{}",
+                  metaGroupMember.getName(),
+                  entry.getKey(),
+                  e.getMessage());
+            }
+
+            // 2. register ttl in the snapshot
+            try {
+              IoTDB.metaManager.setTTL(sgPath, entry.getValue());
+              StorageEngine.getInstance().setTTL(sgPath, entry.getValue());
+            } catch (MetadataException | IOException e) {
+              logger.error(
+                  "{}: Cannot set ttl in storage group {} , errMessage: {}",
+                  metaGroupMember.getName(),
+                  entry.getKey(),
+                  e.getMessage());
+            }
           }
 
-          // 2. register ttl in the snapshot
+          // 3. replace all users and roles
           try {
-            IoTDB.metaManager.setTTL(sgPath, entry.getValue());
-            StorageEngine.getInstance().setTTL(sgPath, entry.getValue());
-          } catch (MetadataException | IOException e) {
+            IAuthorizer authorizer = BasicAuthorizer.getInstance();
+            installSnapshotUsers(authorizer, snapshot);
+            installSnapshotRoles(authorizer, snapshot);
+          } catch (AuthException e) {
             logger.error(
-                "{}: Cannot set ttl in storage group {} , errMessage: {}",
-                metaGroupMember.getName(),
-                entry.getKey(),
-                e.getMessage());
+                "{}: Cannot get authorizer instance, error is: ", metaGroupMember.getName(), e);
           }
-        }
+          // 4. accept template map
+          TemplateManager.getInstance().setTemplateMap(snapshot.templateMap);
 
-        // 3. replace all users and roles
-        try {
-          IAuthorizer authorizer = BasicAuthorizer.getInstance();
-          installSnapshotUsers(authorizer, snapshot);
-          installSnapshotRoles(authorizer, snapshot);
-        } catch (AuthException e) {
-          logger.error(
-              "{}: Cannot get authorizer instance, error is: ", metaGroupMember.getName(), e);
-        }
-        // 4. accept template map
-        TemplateManager.getInstance().setTemplateMap(snapshot.templateMap);
+          // 5. accept partition table
+          metaGroupMember.acceptVerifiedPartitionTable(snapshot.getPartitionTableBuffer(), true);
 
-        // 5. accept partition table
-        metaGroupMember.acceptVerifiedPartitionTable(snapshot.getPartitionTableBuffer(), true);
-
-        synchronized (metaGroupMember.getLogManager()) {
-          metaGroupMember.getLogManager().applySnapshot(snapshot);
+          synchronized (metaGroupMember.getLogManager()) {
+            metaGroupMember.getLogManager().applySnapshot(snapshot);
+          }
+        } finally {
+          lock.unlock();
         }
+      } else {
+        logger.info(
+            "{}: is under snapshot installation now. This request is omitted. MetaSimpleSnapshot: {}",
+            metaGroupMember.getName(),
+            snapshot);
       }
     }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
index e346d9c..e243e4c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.concurrent.locks.Lock;
 
 /** PartitionedSnapshot stores the snapshot of each slot in a map. */
 public class PartitionedSnapshot<T extends Snapshot> extends Snapshot {
@@ -162,24 +163,30 @@ public class PartitionedSnapshot<T extends Snapshot> extends Snapshot {
      */
     private void installPartitionedSnapshot(PartitionedSnapshot<T> snapshot)
         throws SnapshotInstallationException {
-      logger.info(
-          "{}: start to install a snapshot of {}-{}",
-          dataGroupMember.getName(),
-          snapshot.lastLogIndex,
-          snapshot.lastLogTerm);
-      synchronized (dataGroupMember.getSnapshotApplyLock()) {
-        List<Integer> slots =
-            ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
-                .getNodeSlots(dataGroupMember.getHeader());
-        for (Integer slot : slots) {
-          T subSnapshot = snapshot.getSnapshot(slot);
-          if (subSnapshot != null) {
-            installSnapshot(subSnapshot, slot);
+      logger.info("{}: start to install a snapshot of {}", dataGroupMember.getName(), snapshot);
+      Lock lock = dataGroupMember.getSnapshotApplyLock();
+      if (lock.tryLock()) {
+        try {
+          List<Integer> slots =
+              ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
+                  .getNodeSlots(dataGroupMember.getHeader());
+          for (Integer slot : slots) {
+            T subSnapshot = snapshot.getSnapshot(slot);
+            if (subSnapshot != null) {
+              installSnapshot(subSnapshot, slot);
+            }
           }
+          synchronized (dataGroupMember.getLogManager()) {
+            dataGroupMember.getLogManager().applySnapshot(snapshot);
+          }
+        } finally {
+          lock.unlock();
         }
-        synchronized (dataGroupMember.getLogManager()) {
-          dataGroupMember.getLogManager().applySnapshot(snapshot);
-        }
+      } else {
+        logger.info(
+            "{}: is under snapshot installation now. This request is omitted. PartitionedSnapshot: {}",
+            dataGroupMember.getName(),
+            snapshot);
       }
     }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index bccb16f..9fa1c66 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -114,9 +114,9 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
         peer.setMatchIndex(-1);
       }
 
-      // only start a catch up when the follower's lastLogIndex remains stall and unchanged for 3
-      // heartbeats
-      if (lastLogIdx == peer.getLastHeartBeatIndex()) {
+      // only start a catch up when the follower's lastLogIndex remains stall and unchanged for 5
+      // heartbeats. If the follower is installing snapshot currently, we reset the counter.
+      if (lastLogIdx == peer.getLastHeartBeatIndex() && !resp.isInstallingSnapshot()) {
         // the follower's lastLogIndex is unchanged, increase inconsistent counter
         int inconsistentNum = peer.incInconsistentHeartbeatNum();
         if (inconsistentNum >= 5) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 9436df6..133af64 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -112,6 +112,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
 
@@ -142,7 +144,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    */
   private final Object waitLeaderCondition = new Object();
   /** the lock is to make sure that only one thread can apply snapshot at the same time */
-  private final Object snapshotApplyLock = new Object();
+  private final Lock snapshotApplyLock = new ReentrantLock();
 
   private final Object heartBeatWaitObject = new Object();
 
@@ -398,7 +400,12 @@ public abstract class RaftMember implements RaftMemberMBean {
         // tell the leader the local log progress so it may decide whether to perform a catch up
         response.setLastLogIndex(logManager.getLastLogIndex());
         response.setLastLogTerm(logManager.getLastLogTerm());
-
+        // if the snapshot apply lock is held, it means that a snapshot is installing now.
+        boolean isFree = snapshotApplyLock.tryLock();
+        if (isFree) {
+          snapshotApplyLock.unlock();
+        }
+        response.setInstallingSnapshot(!isFree);
         if (logger.isDebugEnabled()) {
           logger.debug(
               "{}: log commit log index = {}, max have applied commit index = {}",
@@ -1970,7 +1977,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     this.appendLogThreadPool = appendLogThreadPool;
   }
 
-  public Object getSnapshotApplyLock() {
+  public Lock getSnapshotApplyLock() {
     return snapshotApplyLock;
   }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java
index 1c9956f..f8317a5 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java
@@ -48,8 +48,13 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -148,7 +153,7 @@ public class MetaSimpleSnapshotTest extends IoTDBTest {
   }
 
   @Test
-  public void testInstall()
+  public void testInstallSuccessfully()
       throws IllegalPathException, SnapshotInstallationException, AuthException {
     Map<PartialPath, Long> storageGroupTTLMap = new HashMap<>();
     Map<String, User> userMap = new HashMap<>();
@@ -226,4 +231,115 @@ public class MetaSimpleSnapshotTest extends IoTDBTest {
     assertEquals(lastLogTerm, metaGroupMember.getLogManager().getLastLogTerm());
     assertTrue(subServerInitialized);
   }
+
+  @Test
+  public void testInstallOmitted()
+      throws IllegalPathException, SnapshotInstallationException, AuthException,
+          InterruptedException {
+    Map<PartialPath, Long> storageGroupTTLMap = new HashMap<>();
+    Map<String, User> userMap = new HashMap<>();
+    Map<String, Role> roleMap = new HashMap<>();
+    Map<String, Template> templateMap = new HashMap<>();
+    PartitionTable partitionTable = TestUtils.getPartitionTable(10);
+    long lastLogIndex = 10;
+    long lastLogTerm = 5;
+
+    for (int i = 0; i < 10; i++) {
+      PartialPath partialPath = new PartialPath("root.ln.sg" + i);
+      storageGroupTTLMap.put(partialPath, (long) i);
+    }
+
+    for (int i = 0; i < 5; i++) {
+      String userName = "user_" + i;
+      User user = new User(userName, "password_" + i);
+      userMap.put(userName, user);
+    }
+
+    for (int i = 0; i < 10; i++) {
+      String roleName = "role_" + i;
+      Role role = new Role(roleName);
+      roleMap.put(roleName, role);
+    }
+
+    CreateTemplatePlan createTemplatePlan = CreateTemplatePlanUtil.getCreateTemplatePlan();
+
+    for (int i = 0; i < 10; i++) {
+      String templateName = "template_" + i;
+      createTemplatePlan.setName(templateName);
+      Template template = new Template(createTemplatePlan);
+      templateMap.put(templateName, template);
+    }
+
+    MetaSimpleSnapshot metaSimpleSnapshot =
+        new MetaSimpleSnapshot(
+            storageGroupTTLMap, userMap, roleMap, templateMap, partitionTable.serialize());
+    metaSimpleSnapshot.setLastLogIndex(lastLogIndex);
+    metaSimpleSnapshot.setLastLogTerm(lastLogTerm);
+
+    AtomicBoolean isLocked = new AtomicBoolean(false);
+    Lock snapshotLock = metaGroupMember.getSnapshotApplyLock();
+    Lock signalLock = new ReentrantLock();
+    signalLock.lock();
+    try {
+      // Simulate another snapshot being installed
+      new Thread(
+              () -> {
+                boolean localLocked = snapshotLock.tryLock();
+                if (localLocked) {
+                  isLocked.set(true);
+                  // Use signalLock to make sure this thread can hold the snapshotLock as long as
+                  // possible
+                  signalLock.lock();
+                  signalLock.unlock();
+                  snapshotLock.unlock();
+                }
+              })
+          .start();
+      // Waiting another thread locking the snapshotLock
+      for (int i = 0; i < 10; i++) {
+        Thread.sleep(100);
+        if (isLocked.get()) {
+          break;
+        }
+      }
+      Assert.assertTrue(isLocked.get());
+      SnapshotInstaller defaultInstaller = metaSimpleSnapshot.getDefaultInstaller(metaGroupMember);
+      defaultInstaller.install(metaSimpleSnapshot, -1, false);
+
+      Map<PartialPath, Long> storageGroupsTTL = IoTDB.metaManager.getStorageGroupsTTL();
+      for (int i = 0; i < 10; i++) {
+        PartialPath partialPath = new PartialPath("root.ln.sg" + i);
+        assertNull(storageGroupsTTL.get(partialPath));
+      }
+
+      for (int i = 0; i < 5; i++) {
+        String userName = "user_" + i;
+        User user = BasicAuthorizer.getInstance().getUser(userName);
+        assertNull(user);
+      }
+
+      for (int i = 0; i < 10; i++) {
+        String roleName = "role_" + i;
+        Role role = BasicAuthorizer.getInstance().getRole(roleName);
+        assertNull(role);
+      }
+
+      for (int i = 0; i < 10; i++) {
+        String templateName = "template_" + i;
+        try {
+          TemplateManager.getInstance().getTemplate(templateName);
+          fail();
+        } catch (UndefinedTemplateException e) {
+          // Do nothing
+        }
+      }
+
+      assertNull(metaGroupMember.getPartitionTable());
+      assertEquals(-1, metaGroupMember.getLogManager().getLastLogIndex());
+      assertEquals(-1, metaGroupMember.getLogManager().getLastLogTerm());
+      assertFalse(subServerInitialized);
+    } finally {
+      signalLock.unlock();
+    }
+  }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
index 27a1735..010ce41 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -39,6 +40,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -74,7 +78,7 @@ public class PartitionedSnapshotTest extends DataSnapshotTest {
   }
 
   @Test
-  public void testInstall()
+  public void testInstallSuccessfully()
       throws IOException, WriteProcessException, SnapshotInstallationException,
           IllegalPathException, StorageEngineException {
     List<TsFileResource> tsFileResources = TestUtils.prepareTsFileResources(0, 10, 10, 10, true);
@@ -120,4 +124,79 @@ public class PartitionedSnapshotTest extends DataSnapshotTest {
       assertFalse(tsFileResource.getTsFile().exists());
     }
   }
+
+  @Test
+  public void testInstallOmitted()
+      throws IOException, WriteProcessException, SnapshotInstallationException,
+          IllegalPathException, StorageEngineException, InterruptedException {
+    List<TsFileResource> tsFileResources = TestUtils.prepareTsFileResources(0, 10, 10, 10, true);
+    PartitionedSnapshot snapshot = new PartitionedSnapshot(FileSnapshot.Factory.INSTANCE);
+    List<TimeseriesSchema> timeseriesSchemas = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      FileSnapshot fileSnapshot = new FileSnapshot();
+      fileSnapshot.addFile(tsFileResources.get(i), TestUtils.getNode(i));
+      timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(0, i));
+      fileSnapshot.setTimeseriesSchemas(
+          Collections.singletonList(TestUtils.getTestTimeSeriesSchema(0, i)));
+      snapshot.putSnapshot(i, fileSnapshot);
+    }
+    snapshot.setLastLogIndex(10);
+    snapshot.setLastLogTerm(5);
+
+    AtomicBoolean isLocked = new AtomicBoolean(false);
+    Lock snapshotLock = dataGroupMember.getSnapshotApplyLock();
+    Lock signalLock = new ReentrantLock();
+    signalLock.lock();
+    try {
+      // Simulate another snapshot being installed
+      new Thread(
+              () -> {
+                boolean localLocked = snapshotLock.tryLock();
+                if (localLocked) {
+                  isLocked.set(true);
+                  // Use signalLock to make sure this thread can hold the snapshotLock as long as
+                  // possible
+                  signalLock.lock();
+                  signalLock.unlock();
+                  snapshotLock.unlock();
+                }
+              })
+          .start();
+      // Waiting another thread locking the snapshotLock
+      for (int i = 0; i < 10; i++) {
+        Thread.sleep(100);
+        if (isLocked.get()) {
+          break;
+        }
+      }
+      Assert.assertTrue(isLocked.get());
+
+      SnapshotInstaller<PartitionedSnapshot> defaultInstaller =
+          snapshot.getDefaultInstaller(dataGroupMember);
+      for (int i = 0; i < 10; i++) {
+        dataGroupMember.getSlotManager().setToPulling(i, TestUtils.getNode(0));
+      }
+      defaultInstaller.install(snapshot, -1, false);
+      // after installation, the slot should be unchanged
+      for (int i = 0; i < 10; i++) {
+        assertEquals(SlotStatus.PULLING, dataGroupMember.getSlotManager().getStatus(i));
+      }
+
+      for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
+        assertFalse(IoTDB.metaManager.isPathExist(new PartialPath(timeseriesSchema.getFullPath())));
+      }
+      VirtualStorageGroupProcessor processor =
+          StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
+      assertEquals(-1, processor.getPartitionMaxFileVersions(0));
+      List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
+      assertEquals(0, loadedFiles.size());
+      assertEquals(0, processor.getUnSequenceFileList().size());
+
+      for (TsFileResource tsFileResource : tsFileResources) {
+        assertTrue(tsFileResource.getTsFile().exists());
+      }
+    } finally {
+      signalLock.unlock();
+    }
+  }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
index d6b4ecf..8c4f7f1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
@@ -78,6 +78,7 @@ public class HeartbeatHandlerTest {
     response.setLastLogTerm(-2);
     response.setFollower(
         new Node("192.168.0.6", 9003, 6, 40010, Constants.RPC_PORT, "192.168.0.6"));
+    response.setInstallingSnapshot(false);
     catchUpFlag = false;
     for (int i = 0; i < looseInconsistentNum; i++) {
       handler.onComplete(response);
@@ -86,6 +87,22 @@ public class HeartbeatHandlerTest {
   }
 
   @Test
+  public void testSnapshotRequestOmitted() {
+    HeartbeatHandler handler = new HeartbeatHandler(metaGroupMember, TestUtils.getNode(1));
+    HeartBeatResponse response = new HeartBeatResponse();
+    response.setTerm(Response.RESPONSE_AGREE);
+    response.setLastLogTerm(-2);
+    response.setFollower(
+        new Node("192.168.0.6", 9003, 6, 40010, Constants.RPC_PORT, "192.168.0.6"));
+    response.setInstallingSnapshot(true);
+    catchUpFlag = false;
+    for (int i = 0; i < looseInconsistentNum; i++) {
+      handler.onComplete(response);
+    }
+    assertFalse(catchUpFlag);
+  }
+
+  @Test
   public void testLeaderShipStale() {
     HeartbeatHandler handler = new HeartbeatHandler(metaGroupMember, TestUtils.getNode(1));
     HeartBeatResponse response = new HeartBeatResponse();
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index 669bf21..7ae9ada 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -56,6 +56,7 @@ struct HeartBeatResponse {
   // because a data server may play many data groups members, this is used to identify which
   // member should process the request or response. Only used in data group communication.
   7: optional Node header
+  8: required bool installingSnapshot // whether the follower is installing snapshot now
 }
 
 struct RequestCommitIndexResponse {