You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/03/29 13:22:46 UTC
[iotdb] branch rel/1.1 updated: [IOTDB-5738] Retain 2 copies of snapshot (#9474) (#9475)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new ce038eaf62 [IOTDB-5738] Retain 2 copies of snapshot (#9474) (#9475)
ce038eaf62 is described below
commit ce038eaf62c3680d5b1dbf9278140e540eeb2caf
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Wed Mar 29 21:22:39 2023 +0800
[IOTDB-5738] Retain 2 copies of snapshot (#9474) (#9475)
---
.../iotdb/consensus/ratis/SnapshotStorage.java | 10 ++--
.../org/apache/iotdb/consensus/ratis/Utils.java | 2 +
.../apache/iotdb/consensus/ratis/SnapshotTest.java | 53 +++++++++++++++-------
3 files changed, 46 insertions(+), 19 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index c7b843f994..d93663b27a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
@@ -166,11 +167,14 @@ public class SnapshotStorage implements StateMachineStorage {
@Override
public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy)
throws IOException {
- Path[] sortedSnapshotDirs = getSortedSnapshotDirPaths();
- if (sortedSnapshotDirs == null || sortedSnapshotDirs.length == 0) {
+ final Path[] sortedSnapshotDirs = getSortedSnapshotDirPaths();
+ if (ArrayUtils.isEmpty(sortedSnapshotDirs)) {
return;
}
- for (int i = 0; i < sortedSnapshotDirs.length - 1; i++) {
+
+ final int cleanIndex =
+ Math.max(0, sortedSnapshotDirs.length - snapshotRetentionPolicy.getNumSnapshotsRetained());
+ for (int i = 0; i < cleanIndex; i++) {
FileUtils.deleteFully(sortedSnapshotDirs[i]);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 639d7e2c15..c40f1b511b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -217,6 +217,8 @@ public class Utils {
RaftServerConfigKeys.Snapshot.setCreationGap(properties, config.getSnapshot().getCreationGap());
RaftServerConfigKeys.Snapshot.setRetentionFileNum(
properties, config.getSnapshot().getRetentionFileNum());
+ // FIXME: retain 2 copies to avoid race conditions between (delete) and (transfer)
+ RaftServerConfigKeys.Snapshot.setRetentionFileNum(properties, 2);
RaftServerConfigKeys.ThreadPool.setClientCached(
properties, config.getThreadPool().isClientCached());
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
index 9a642f1bda..b2c3fae3ae 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -26,6 +26,7 @@ import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.server.storage.RaftStorageMetadataFile;
import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
import org.junit.Assert;
@@ -40,6 +41,7 @@ import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Scanner;
+import java.util.function.Predicate;
public class SnapshotTest {
@@ -97,37 +99,56 @@ public class SnapshotTest {
@Test
public void testSnapshot() throws Exception {
- ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.create(0, 0);
- RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
- ApplicationStateMachineProxy proxy =
+ final ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.create(0, 0);
+ final RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
+ final ApplicationStateMachineProxy proxy =
new ApplicationStateMachineProxy(new TestUtils.IntegerCounter(), raftGroupId);
proxy.initialize(null, null, new EmptyStorageWithOnlySMDir());
+ final Predicate<String> snapshotExists = s -> new File(s).exists();
+
+ proxy.notifyTermIndexUpdated(215, 72);
+ final String snapshotFilename0 =
+ TestUtils.IntegerCounter.ensureSnapshotFileName(testDir, "215_72");
+ final long index0 = proxy.takeSnapshot();
+ Assert.assertEquals(72, index0);
+ Assert.assertTrue(snapshotExists.test(snapshotFilename0));
+
// take a snapshot at 421-616
proxy.notifyTermIndexUpdated(421, 616);
- String snapshotFilename = TestUtils.IntegerCounter.ensureSnapshotFileName(testDir, "421_616");
- long index = proxy.takeSnapshot();
- Assert.assertEquals(index, 616);
- Assert.assertTrue(new File(snapshotFilename).exists());
+ final String snapshotFilename =
+ TestUtils.IntegerCounter.ensureSnapshotFileName(testDir, "421_616");
+ final long index = proxy.takeSnapshot();
+ Assert.assertEquals(616, index);
+ Assert.assertTrue(snapshotExists.test(snapshotFilename));
// take a snapshot at 616-4217
proxy.notifyTermIndexUpdated(616, 4217);
- String snapshotFilenameLatest =
+ final String snapshotFilenameLatest =
TestUtils.IntegerCounter.ensureSnapshotFileName(testDir, "616_4217");
- long indexLatest = proxy.takeSnapshot();
- Assert.assertEquals(indexLatest, 4217);
- Assert.assertTrue(new File(snapshotFilenameLatest).exists());
+ final long indexLatest = proxy.takeSnapshot();
+ Assert.assertEquals(4217, indexLatest);
+ Assert.assertTrue(snapshotExists.test(snapshotFilenameLatest));
// query the latest snapshot
SnapshotInfo info = proxy.getLatestSnapshot();
- Assert.assertEquals(info.getTerm(), 616);
- Assert.assertEquals(info.getIndex(), 4217);
+ Assert.assertEquals(616, info.getTerm());
+ Assert.assertEquals(4217, info.getIndex());
// clean up
- proxy.getStateMachineStorage().cleanupOldSnapshots(null);
- Assert.assertFalse(new File(snapshotFilename).exists());
- Assert.assertTrue(new File(snapshotFilenameLatest).exists());
+ proxy
+ .getStateMachineStorage()
+ .cleanupOldSnapshots(
+ new SnapshotRetentionPolicy() {
+ @Override
+ public int getNumSnapshotsRetained() {
+ return 2;
+ }
+ });
+ Assert.assertFalse(snapshotExists.test(snapshotFilename0));
+ Assert.assertTrue(snapshotExists.test(snapshotFilename));
+ Assert.assertTrue(snapshotExists.test(snapshotFilenameLatest));
}
static class CrossDiskLinkStatemachine extends TestUtils.IntegerCounter {