You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/11 01:51:20 UTC
[iotdb] branch master updated: [IOTDB-2711] Fix memory allocation deadlock by concurrent snapshot requests (#5195)
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d16cea4 [IOTDB-2711] Fix memory allocation deadlock by concurrent snapshot requests (#5195)
d16cea4 is described below
commit d16cea42783f17d2e87ef0320d4cb876652f443f
Author: BaiJian <er...@hotmail.com>
AuthorDate: Fri Mar 11 09:47:51 2022 +0800
[IOTDB-2711] Fix memory allocation deadlock by concurrent snapshot requests (#5195)
---
.../org/apache/iotdb/cluster/log/Snapshot.java | 5 +
.../cluster/log/snapshot/MetaSimpleSnapshot.java | 89 +++++++++-------
.../cluster/log/snapshot/PartitionedSnapshot.java | 39 ++++---
.../server/handlers/caller/HeartbeatHandler.java | 6 +-
.../iotdb/cluster/server/member/RaftMember.java | 13 ++-
.../log/snapshot/MetaSimpleSnapshotTest.java | 118 ++++++++++++++++++++-
.../log/snapshot/PartitionedSnapshotTest.java | 81 +++++++++++++-
.../handlers/caller/HeartbeatHandlerTest.java | 17 +++
thrift-cluster/src/main/thrift/cluster.thrift | 1 +
9 files changed, 306 insertions(+), 63 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Snapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Snapshot.java
index 5b67a2f..b0c0761 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Snapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Snapshot.java
@@ -67,4 +67,9 @@ public abstract class Snapshot {
* @param minIndex
*/
public void truncateBefore(long minIndex) {}
+
+ @Override
+ public String toString() {
+ return String.format("%d-%d", lastLogIndex, lastLogTerm);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java
index adca4aa..1cc376e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java
@@ -48,6 +48,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.locks.Lock;
/** MetaSimpleSnapshot also records all storage groups. */
public class MetaSimpleSnapshot extends Snapshot {
@@ -218,53 +219,63 @@ public class MetaSimpleSnapshot extends Snapshot {
* authentication info, and last log term/index in the snapshot.
*/
private void installSnapshot(MetaSimpleSnapshot snapshot) {
- synchronized (metaGroupMember.getSnapshotApplyLock()) {
- // 1. register all storage groups
- for (Map.Entry<PartialPath, Long> entry : snapshot.getStorageGroupTTLMap().entrySet()) {
- PartialPath sgPath = entry.getKey();
- try {
- IoTDB.metaManager.setStorageGroup(sgPath);
- } catch (StorageGroupAlreadySetException e) {
- // ignore
- } catch (MetadataException e) {
- logger.error(
- "{}: Cannot add storage group {} in snapshot, errMessage:{}",
- metaGroupMember.getName(),
- entry.getKey(),
- e.getMessage());
+ Lock lock = metaGroupMember.getSnapshotApplyLock();
+ if (lock.tryLock()) {
+ try {
+ // 1. register all storage groups
+ for (Map.Entry<PartialPath, Long> entry : snapshot.getStorageGroupTTLMap().entrySet()) {
+ PartialPath sgPath = entry.getKey();
+ try {
+ IoTDB.metaManager.setStorageGroup(sgPath);
+ } catch (StorageGroupAlreadySetException e) {
+ // ignore
+ } catch (MetadataException e) {
+ logger.error(
+ "{}: Cannot add storage group {} in snapshot, errMessage:{}",
+ metaGroupMember.getName(),
+ entry.getKey(),
+ e.getMessage());
+ }
+
+ // 2. register ttl in the snapshot
+ try {
+ IoTDB.metaManager.setTTL(sgPath, entry.getValue());
+ StorageEngine.getInstance().setTTL(sgPath, entry.getValue());
+ } catch (MetadataException | IOException e) {
+ logger.error(
+ "{}: Cannot set ttl in storage group {} , errMessage: {}",
+ metaGroupMember.getName(),
+ entry.getKey(),
+ e.getMessage());
+ }
}
- // 2. register ttl in the snapshot
+ // 3. replace all users and roles
try {
- IoTDB.metaManager.setTTL(sgPath, entry.getValue());
- StorageEngine.getInstance().setTTL(sgPath, entry.getValue());
- } catch (MetadataException | IOException e) {
+ IAuthorizer authorizer = BasicAuthorizer.getInstance();
+ installSnapshotUsers(authorizer, snapshot);
+ installSnapshotRoles(authorizer, snapshot);
+ } catch (AuthException e) {
logger.error(
- "{}: Cannot set ttl in storage group {} , errMessage: {}",
- metaGroupMember.getName(),
- entry.getKey(),
- e.getMessage());
+ "{}: Cannot get authorizer instance, error is: ", metaGroupMember.getName(), e);
}
- }
+ // 4. accept template map
+ TemplateManager.getInstance().setTemplateMap(snapshot.templateMap);
- // 3. replace all users and roles
- try {
- IAuthorizer authorizer = BasicAuthorizer.getInstance();
- installSnapshotUsers(authorizer, snapshot);
- installSnapshotRoles(authorizer, snapshot);
- } catch (AuthException e) {
- logger.error(
- "{}: Cannot get authorizer instance, error is: ", metaGroupMember.getName(), e);
- }
- // 4. accept template map
- TemplateManager.getInstance().setTemplateMap(snapshot.templateMap);
+ // 5. accept partition table
+ metaGroupMember.acceptVerifiedPartitionTable(snapshot.getPartitionTableBuffer(), true);
- // 5. accept partition table
- metaGroupMember.acceptVerifiedPartitionTable(snapshot.getPartitionTableBuffer(), true);
-
- synchronized (metaGroupMember.getLogManager()) {
- metaGroupMember.getLogManager().applySnapshot(snapshot);
+ synchronized (metaGroupMember.getLogManager()) {
+ metaGroupMember.getLogManager().applySnapshot(snapshot);
+ }
+ } finally {
+ lock.unlock();
}
+ } else {
+ logger.info(
+ "{}: is under snapshot installation now. This request is omitted. MetaSimpleSnapshot: {}",
+ metaGroupMember.getName(),
+ snapshot);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
index e346d9c..e243e4c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.concurrent.locks.Lock;
/** PartitionedSnapshot stores the snapshot of each slot in a map. */
public class PartitionedSnapshot<T extends Snapshot> extends Snapshot {
@@ -162,24 +163,30 @@ public class PartitionedSnapshot<T extends Snapshot> extends Snapshot {
*/
private void installPartitionedSnapshot(PartitionedSnapshot<T> snapshot)
throws SnapshotInstallationException {
- logger.info(
- "{}: start to install a snapshot of {}-{}",
- dataGroupMember.getName(),
- snapshot.lastLogIndex,
- snapshot.lastLogTerm);
- synchronized (dataGroupMember.getSnapshotApplyLock()) {
- List<Integer> slots =
- ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
- .getNodeSlots(dataGroupMember.getHeader());
- for (Integer slot : slots) {
- T subSnapshot = snapshot.getSnapshot(slot);
- if (subSnapshot != null) {
- installSnapshot(subSnapshot, slot);
+ logger.info("{}: start to install a snapshot of {}", dataGroupMember.getName(), snapshot);
+ Lock lock = dataGroupMember.getSnapshotApplyLock();
+ if (lock.tryLock()) {
+ try {
+ List<Integer> slots =
+ ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
+ .getNodeSlots(dataGroupMember.getHeader());
+ for (Integer slot : slots) {
+ T subSnapshot = snapshot.getSnapshot(slot);
+ if (subSnapshot != null) {
+ installSnapshot(subSnapshot, slot);
+ }
}
+ synchronized (dataGroupMember.getLogManager()) {
+ dataGroupMember.getLogManager().applySnapshot(snapshot);
+ }
+ } finally {
+ lock.unlock();
}
- synchronized (dataGroupMember.getLogManager()) {
- dataGroupMember.getLogManager().applySnapshot(snapshot);
- }
+ } else {
+ logger.info(
+ "{}: is under snapshot installation now. This request is omitted. PartitionedSnapshot: {}",
+ dataGroupMember.getName(),
+ snapshot);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index bccb16f..9fa1c66 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -114,9 +114,9 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
peer.setMatchIndex(-1);
}
- // only start a catch up when the follower's lastLogIndex remains stall and unchanged for 3
- // heartbeats
- if (lastLogIdx == peer.getLastHeartBeatIndex()) {
+ // only start a catch up when the follower's lastLogIndex remains stall and unchanged for 5
+ // heartbeats. If the follower is installing snapshot currently, we reset the counter.
+ if (lastLogIdx == peer.getLastHeartBeatIndex() && !resp.isInstallingSnapshot()) {
// the follower's lastLogIndex is unchanged, increase inconsistent counter
int inconsistentNum = peer.incInconsistentHeartbeatNum();
if (inconsistentNum >= 5) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 9436df6..133af64 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -112,6 +112,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
@@ -142,7 +144,7 @@ public abstract class RaftMember implements RaftMemberMBean {
*/
private final Object waitLeaderCondition = new Object();
/** the lock is to make sure that only one thread can apply snapshot at the same time */
- private final Object snapshotApplyLock = new Object();
+ private final Lock snapshotApplyLock = new ReentrantLock();
private final Object heartBeatWaitObject = new Object();
@@ -398,7 +400,12 @@ public abstract class RaftMember implements RaftMemberMBean {
// tell the leader the local log progress so it may decide whether to perform a catch up
response.setLastLogIndex(logManager.getLastLogIndex());
response.setLastLogTerm(logManager.getLastLogTerm());
-
+ // if the snapshot apply lock is held, it means that a snapshot is installing now.
+ boolean isFree = snapshotApplyLock.tryLock();
+ if (isFree) {
+ snapshotApplyLock.unlock();
+ }
+ response.setInstallingSnapshot(!isFree);
if (logger.isDebugEnabled()) {
logger.debug(
"{}: log commit log index = {}, max have applied commit index = {}",
@@ -1970,7 +1977,7 @@ public abstract class RaftMember implements RaftMemberMBean {
this.appendLogThreadPool = appendLogThreadPool;
}
- public Object getSnapshotApplyLock() {
+ public Lock getSnapshotApplyLock() {
return snapshotApplyLock;
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java
index 1c9956f..f8317a5 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java
@@ -48,8 +48,13 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -148,7 +153,7 @@ public class MetaSimpleSnapshotTest extends IoTDBTest {
}
@Test
- public void testInstall()
+ public void testInstallSuccessfully()
throws IllegalPathException, SnapshotInstallationException, AuthException {
Map<PartialPath, Long> storageGroupTTLMap = new HashMap<>();
Map<String, User> userMap = new HashMap<>();
@@ -226,4 +231,115 @@ public class MetaSimpleSnapshotTest extends IoTDBTest {
assertEquals(lastLogTerm, metaGroupMember.getLogManager().getLastLogTerm());
assertTrue(subServerInitialized);
}
+
+ @Test
+ public void testInstallOmitted()
+ throws IllegalPathException, SnapshotInstallationException, AuthException,
+ InterruptedException {
+ Map<PartialPath, Long> storageGroupTTLMap = new HashMap<>();
+ Map<String, User> userMap = new HashMap<>();
+ Map<String, Role> roleMap = new HashMap<>();
+ Map<String, Template> templateMap = new HashMap<>();
+ PartitionTable partitionTable = TestUtils.getPartitionTable(10);
+ long lastLogIndex = 10;
+ long lastLogTerm = 5;
+
+ for (int i = 0; i < 10; i++) {
+ PartialPath partialPath = new PartialPath("root.ln.sg" + i);
+ storageGroupTTLMap.put(partialPath, (long) i);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ String userName = "user_" + i;
+ User user = new User(userName, "password_" + i);
+ userMap.put(userName, user);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ String roleName = "role_" + i;
+ Role role = new Role(roleName);
+ roleMap.put(roleName, role);
+ }
+
+ CreateTemplatePlan createTemplatePlan = CreateTemplatePlanUtil.getCreateTemplatePlan();
+
+ for (int i = 0; i < 10; i++) {
+ String templateName = "template_" + i;
+ createTemplatePlan.setName(templateName);
+ Template template = new Template(createTemplatePlan);
+ templateMap.put(templateName, template);
+ }
+
+ MetaSimpleSnapshot metaSimpleSnapshot =
+ new MetaSimpleSnapshot(
+ storageGroupTTLMap, userMap, roleMap, templateMap, partitionTable.serialize());
+ metaSimpleSnapshot.setLastLogIndex(lastLogIndex);
+ metaSimpleSnapshot.setLastLogTerm(lastLogTerm);
+
+ AtomicBoolean isLocked = new AtomicBoolean(false);
+ Lock snapshotLock = metaGroupMember.getSnapshotApplyLock();
+ Lock signalLock = new ReentrantLock();
+ signalLock.lock();
+ try {
+ // Simulate another snapshot being installed
+ new Thread(
+ () -> {
+ boolean localLocked = snapshotLock.tryLock();
+ if (localLocked) {
+ isLocked.set(true);
+ // Use signalLock to make sure this thread can hold the snapshotLock as long as
+ // possible
+ signalLock.lock();
+ signalLock.unlock();
+ snapshotLock.unlock();
+ }
+ })
+ .start();
+ // Waiting another thread locking the snapshotLock
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(100);
+ if (isLocked.get()) {
+ break;
+ }
+ }
+ Assert.assertTrue(isLocked.get());
+ SnapshotInstaller defaultInstaller = metaSimpleSnapshot.getDefaultInstaller(metaGroupMember);
+ defaultInstaller.install(metaSimpleSnapshot, -1, false);
+
+ Map<PartialPath, Long> storageGroupsTTL = IoTDB.metaManager.getStorageGroupsTTL();
+ for (int i = 0; i < 10; i++) {
+ PartialPath partialPath = new PartialPath("root.ln.sg" + i);
+ assertNull(storageGroupsTTL.get(partialPath));
+ }
+
+ for (int i = 0; i < 5; i++) {
+ String userName = "user_" + i;
+ User user = BasicAuthorizer.getInstance().getUser(userName);
+ assertNull(user);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ String roleName = "role_" + i;
+ Role role = BasicAuthorizer.getInstance().getRole(roleName);
+ assertNull(role);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ String templateName = "template_" + i;
+ try {
+ TemplateManager.getInstance().getTemplate(templateName);
+ fail();
+ } catch (UndefinedTemplateException e) {
+ // Do nothing
+ }
+ }
+
+ assertNull(metaGroupMember.getPartitionTable());
+ assertEquals(-1, metaGroupMember.getLogManager().getLastLogIndex());
+ assertEquals(-1, metaGroupMember.getLogManager().getLastLogTerm());
+ assertFalse(subServerInitialized);
+ } finally {
+ signalLock.unlock();
+ }
+ }
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
index 27a1735..010ce41 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
@@ -39,6 +40,9 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -74,7 +78,7 @@ public class PartitionedSnapshotTest extends DataSnapshotTest {
}
@Test
- public void testInstall()
+ public void testInstallSuccessfully()
throws IOException, WriteProcessException, SnapshotInstallationException,
IllegalPathException, StorageEngineException {
List<TsFileResource> tsFileResources = TestUtils.prepareTsFileResources(0, 10, 10, 10, true);
@@ -120,4 +124,79 @@ public class PartitionedSnapshotTest extends DataSnapshotTest {
assertFalse(tsFileResource.getTsFile().exists());
}
}
+
+ @Test
+ public void testInstallOmitted()
+ throws IOException, WriteProcessException, SnapshotInstallationException,
+ IllegalPathException, StorageEngineException, InterruptedException {
+ List<TsFileResource> tsFileResources = TestUtils.prepareTsFileResources(0, 10, 10, 10, true);
+ PartitionedSnapshot snapshot = new PartitionedSnapshot(FileSnapshot.Factory.INSTANCE);
+ List<TimeseriesSchema> timeseriesSchemas = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ FileSnapshot fileSnapshot = new FileSnapshot();
+ fileSnapshot.addFile(tsFileResources.get(i), TestUtils.getNode(i));
+ timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(0, i));
+ fileSnapshot.setTimeseriesSchemas(
+ Collections.singletonList(TestUtils.getTestTimeSeriesSchema(0, i)));
+ snapshot.putSnapshot(i, fileSnapshot);
+ }
+ snapshot.setLastLogIndex(10);
+ snapshot.setLastLogTerm(5);
+
+ AtomicBoolean isLocked = new AtomicBoolean(false);
+ Lock snapshotLock = dataGroupMember.getSnapshotApplyLock();
+ Lock signalLock = new ReentrantLock();
+ signalLock.lock();
+ try {
+ // Simulate another snapshot being installed
+ new Thread(
+ () -> {
+ boolean localLocked = snapshotLock.tryLock();
+ if (localLocked) {
+ isLocked.set(true);
+ // Use signalLock to make sure this thread can hold the snapshotLock as long as
+ // possible
+ signalLock.lock();
+ signalLock.unlock();
+ snapshotLock.unlock();
+ }
+ })
+ .start();
+ // Waiting another thread locking the snapshotLock
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(100);
+ if (isLocked.get()) {
+ break;
+ }
+ }
+ Assert.assertTrue(isLocked.get());
+
+ SnapshotInstaller<PartitionedSnapshot> defaultInstaller =
+ snapshot.getDefaultInstaller(dataGroupMember);
+ for (int i = 0; i < 10; i++) {
+ dataGroupMember.getSlotManager().setToPulling(i, TestUtils.getNode(0));
+ }
+ defaultInstaller.install(snapshot, -1, false);
+ // after installation, the slot should be unchanged
+ for (int i = 0; i < 10; i++) {
+ assertEquals(SlotStatus.PULLING, dataGroupMember.getSlotManager().getStatus(i));
+ }
+
+ for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
+ assertFalse(IoTDB.metaManager.isPathExist(new PartialPath(timeseriesSchema.getFullPath())));
+ }
+ VirtualStorageGroupProcessor processor =
+ StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
+ assertEquals(-1, processor.getPartitionMaxFileVersions(0));
+ List<TsFileResource> loadedFiles = processor.getSequenceFileTreeSet();
+ assertEquals(0, loadedFiles.size());
+ assertEquals(0, processor.getUnSequenceFileList().size());
+
+ for (TsFileResource tsFileResource : tsFileResources) {
+ assertTrue(tsFileResource.getTsFile().exists());
+ }
+ } finally {
+ signalLock.unlock();
+ }
+ }
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
index d6b4ecf..8c4f7f1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
@@ -78,6 +78,7 @@ public class HeartbeatHandlerTest {
response.setLastLogTerm(-2);
response.setFollower(
new Node("192.168.0.6", 9003, 6, 40010, Constants.RPC_PORT, "192.168.0.6"));
+ response.setInstallingSnapshot(false);
catchUpFlag = false;
for (int i = 0; i < looseInconsistentNum; i++) {
handler.onComplete(response);
@@ -86,6 +87,22 @@ public class HeartbeatHandlerTest {
}
@Test
+ public void testSnapshotRequestOmitted() {
+ HeartbeatHandler handler = new HeartbeatHandler(metaGroupMember, TestUtils.getNode(1));
+ HeartBeatResponse response = new HeartBeatResponse();
+ response.setTerm(Response.RESPONSE_AGREE);
+ response.setLastLogTerm(-2);
+ response.setFollower(
+ new Node("192.168.0.6", 9003, 6, 40010, Constants.RPC_PORT, "192.168.0.6"));
+ response.setInstallingSnapshot(true);
+ catchUpFlag = false;
+ for (int i = 0; i < looseInconsistentNum; i++) {
+ handler.onComplete(response);
+ }
+ assertFalse(catchUpFlag);
+ }
+
+ @Test
public void testLeaderShipStale() {
HeartbeatHandler handler = new HeartbeatHandler(metaGroupMember, TestUtils.getNode(1));
HeartBeatResponse response = new HeartBeatResponse();
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index 669bf21..7ae9ada 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -56,6 +56,7 @@ struct HeartBeatResponse {
// because a data server may play many data groups members, this is used to identify which
// member should process the request or response. Only used in data group communication.
7: optional Node header
+ 8: required bool installingSnapshot // whether the follower is installing snapshot now
}
struct RequestCommitIndexResponse {