You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/12 05:39:12 UTC
[iotdb] branch master updated: [IOTDB-5345] Use the logical clock to identify the snapshot version of IoTConsensus (#8807)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4ee81d4d68 [IOTDB-5345] Use the logical clock to identify the snapshot version of IoTConsensus (#8807)
4ee81d4d68 is described below
commit 4ee81d4d68315a2b68729f49b8f04cc6f4644a03
Author: BUAAserein <65...@users.noreply.github.com>
AuthorDate: Thu Jan 12 13:39:06 2023 +0800
[IOTDB-5345] Use the logical clock to identify the snapshot version of IoTConsensus (#8807)
---
.../consensus/iot/IoTConsensusServerImpl.java | 54 +++++++++++++--------
.../apache/iotdb/consensus/iot/StabilityTest.java | 55 ++++++++++++++++++++--
2 files changed, 84 insertions(+), 25 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1d668c281a..4b19f64093 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -74,15 +74,14 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
public class IoTConsensusServerImpl {
@@ -103,7 +102,8 @@ public class IoTConsensusServerImpl {
private final IoTConsensusConfig config;
private final ConsensusReqReader reader;
private volatile boolean active;
- private String latestSnapshotId;
+ private String newSnapshotDirName;
+ private static final Pattern snapshotIndexPatten = Pattern.compile(".*[^\\d](?=(\\d+))");
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
private final IoTConsensusServerMetrics metrics;
@@ -287,14 +287,11 @@ public class IoTConsensusServerImpl {
public void takeSnapshot() throws ConsensusGroupModifyPeerException {
try {
- // TODO: We should use logic clock such as searchIndex rather than wall clock to mark the
- // snapshot, otherwise there will be bugs in situations where the clock might fall back, such
- // as CI
- latestSnapshotId =
+ long newSnapshotIndex = getLatestSnapshotIndex() + 1;
+ newSnapshotDirName =
String.format(
- "%s_%s_%d",
- SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), System.currentTimeMillis());
- File snapshotDir = new File(storageDir, latestSnapshotId);
+ "%s_%s_%d", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), newSnapshotIndex);
+ File snapshotDir = new File(storageDir, newSnapshotDirName);
if (snapshotDir.exists()) {
FileUtils.deleteDirectory(snapshotDir);
}
@@ -312,13 +309,13 @@ public class IoTConsensusServerImpl {
}
public void transitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
- File snapshotDir = new File(storageDir, latestSnapshotId);
+ File snapshotDir = new File(storageDir, newSnapshotDirName);
List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
logger.info("transit snapshots: {}", snapshotPaths);
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
for (Path path : snapshotPaths) {
- SnapshotFragmentReader reader = new SnapshotFragmentReader(latestSnapshotId, path);
+ SnapshotFragmentReader reader = new SnapshotFragmentReader(newSnapshotDirName, path);
try {
while (reader.hasNext()) {
TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
@@ -370,6 +367,22 @@ public class IoTConsensusServerImpl {
return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
}
+ private long getLatestSnapshotIndex() {
+ long snapShotIndex = 0;
+ File directory = new File(storageDir);
+ File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
+ if (versionFiles == null || versionFiles.length == 0) {
+ return snapShotIndex;
+ }
+ for (File file : versionFiles) {
+ snapShotIndex =
+ Long.max(
+ snapShotIndex,
+ Long.parseLong(snapshotIndexPatten.matcher(file.getName()).replaceAll("")));
+ }
+ return snapShotIndex;
+ }
+
private void clearOldSnapshot() {
File directory = new File(storageDir);
File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
@@ -379,12 +392,13 @@ public class IoTConsensusServerImpl {
thisNode.getGroupId());
return;
}
- Arrays.sort(versionFiles, Comparator.comparing(File::getName));
- for (int i = 0; i < versionFiles.length - 1; i++) {
- try {
- FileUtils.deleteDirectory(versionFiles[i]);
- } catch (IOException e) {
- logger.error("Delete old snapshot dir {} failed", versionFiles[i].getAbsolutePath(), e);
+ for (File file : versionFiles) {
+ if (!file.getName().equals(newSnapshotDirName)) {
+ try {
+ FileUtils.deleteDirectory(file);
+ } catch (IOException e) {
+ logger.error("Delete old snapshot dir {} failed", file.getAbsolutePath(), e);
+ }
}
}
}
@@ -416,7 +430,7 @@ public class IoTConsensusServerImpl {
TTriggerSnapshotLoadRes res =
client.triggerSnapshotLoad(
new TTriggerSnapshotLoadReq(
- thisNode.getGroupId().convertToTConsensusGroupId(), latestSnapshotId));
+ thisNode.getGroupId().convertToTConsensusGroupId(), newSnapshotDirName));
if (!isSuccess(res.status)) {
throw new ConsensusGroupModifyPeerException(
String.format("error when triggering snapshot load %s. %s", peer, res.getStatus()));
@@ -744,7 +758,7 @@ public class IoTConsensusServerImpl {
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
TCleanupTransferredSnapshotReq req =
new TCleanupTransferredSnapshotReq(
- targetPeer.getGroupId().convertToTConsensusGroupId(), latestSnapshotId);
+ targetPeer.getGroupId().convertToTConsensusGroupId(), newSnapshotDirName);
TCleanupTransferredSnapshotRes res = client.cleanupTransferredSnapshot(req);
if (!isSuccess(res.getStatus())) {
throw new ConsensusGroupModifyPeerException(
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index e7db5ad6e3..ea7c15c8f1 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.iot.util.TestStateMachine;
import org.apache.ratis.util.FileUtils;
@@ -78,7 +79,13 @@ public class StabilityTest {
}
@Test
- public void snapshotTest() throws Exception {
+ public void allTest() throws Exception {
+ peerTest();
+ snapshotTest();
+ snapshotUpgradeTest();
+ }
+
+ public void peerTest() throws Exception {
consensusImpl.createPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 9000))));
@@ -94,11 +101,15 @@ public class StabilityTest {
consensusImpl.createPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 9000))));
-
Assert.assertTrue(response.isSuccess());
+ consensusImpl.deletePeer(dataRegionId);
+ }
+ public void snapshotTest() throws IOException {
+ consensusImpl.createPeer(
+ dataRegionId,
+ Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 9000))));
consensusImpl.triggerSnapshot(dataRegionId);
- Thread.sleep(10);
File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, dataRegionId));
@@ -108,14 +119,48 @@ public class StabilityTest {
Assert.assertEquals(1, versionFiles1.length);
consensusImpl.triggerSnapshot(dataRegionId);
- Thread.sleep(10);
+
consensusImpl.triggerSnapshot(dataRegionId);
File[] versionFiles2 =
dataDir.listFiles((dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME));
Assert.assertNotNull(versionFiles2);
Assert.assertEquals(1, versionFiles2.length);
-
Assert.assertNotEquals(versionFiles1[0].getName(), versionFiles2[0].getName());
+ consensusImpl.deletePeer(dataRegionId);
+ }
+
+ public void snapshotUpgradeTest() throws Exception {
+ consensusImpl.createPeer(
+ dataRegionId,
+ Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 9000))));
+ consensusImpl.triggerSnapshot(dataRegionId);
+ long oldSnapshotIndex = System.currentTimeMillis();
+ String oldSnapshotDirName =
+ String.format(
+ "%s_%s_%d",
+ IoTConsensusServerImpl.SNAPSHOT_DIR_NAME, dataRegionId.getId(), oldSnapshotIndex);
+ File regionDir = new File(storageDir, "1_1");
+ File oldSnapshotDir = new File(regionDir, oldSnapshotDirName);
+ if (oldSnapshotDir.exists()) {
+ FileUtils.deleteFully(oldSnapshotDir);
+ }
+ if (!oldSnapshotDir.mkdirs()) {
+ throw new ConsensusGroupModifyPeerException(
+ String.format("%s: cannot mkdir for snapshot", dataRegionId));
+ }
+ consensusImpl.triggerSnapshot(dataRegionId);
+ Assert.assertFalse(oldSnapshotDir.exists());
+
+ File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, dataRegionId));
+
+ File[] snapshotFiles =
+ dataDir.listFiles((dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME));
+ Assert.assertNotNull(snapshotFiles);
+ Assert.assertEquals(1, snapshotFiles.length);
+ Assert.assertEquals(
+ oldSnapshotIndex + 1,
+ Long.parseLong(snapshotFiles[0].getName().replaceAll(".*[^\\d](?=(\\d+))", "")));
+ consensusImpl.deletePeer(dataRegionId);
}
}