You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/12 05:39:12 UTC

[iotdb] branch master updated: [IOTDB-5345] Use the logical clock to identify the snapshot version of IoTConsensus (#8807)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ee81d4d68 [IOTDB-5345] Use the logical clock to identify the snapshot version of IoTConsensus (#8807)
4ee81d4d68 is described below

commit 4ee81d4d68315a2b68729f49b8f04cc6f4644a03
Author: BUAAserein <65...@users.noreply.github.com>
AuthorDate: Thu Jan 12 13:39:06 2023 +0800

    [IOTDB-5345] Use the logical clock to identify the snapshot version of IoTConsensus (#8807)
---
 .../consensus/iot/IoTConsensusServerImpl.java      | 54 +++++++++++++--------
 .../apache/iotdb/consensus/iot/StabilityTest.java  | 55 ++++++++++++++++++++--
 2 files changed, 84 insertions(+), 25 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1d668c281a..4b19f64093 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -74,15 +74,14 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
 
 public class IoTConsensusServerImpl {
 
@@ -103,7 +102,8 @@ public class IoTConsensusServerImpl {
   private final IoTConsensusConfig config;
   private final ConsensusReqReader reader;
   private volatile boolean active;
-  private String latestSnapshotId;
+  private String newSnapshotDirName;
+  private static final Pattern snapshotIndexPatten = Pattern.compile(".*[^\\d](?=(\\d+))");
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
   private final IoTConsensusServerMetrics metrics;
 
@@ -287,14 +287,11 @@ public class IoTConsensusServerImpl {
 
   public void takeSnapshot() throws ConsensusGroupModifyPeerException {
     try {
-      // TODO: We should use logic clock such as searchIndex rather than wall clock to mark the
-      // snapshot, otherwise there will be bugs in situations where the clock might fall back, such
-      // as CI
-      latestSnapshotId =
+      long newSnapshotIndex = getLatestSnapshotIndex() + 1;
+      newSnapshotDirName =
           String.format(
-              "%s_%s_%d",
-              SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), System.currentTimeMillis());
-      File snapshotDir = new File(storageDir, latestSnapshotId);
+              "%s_%s_%d", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), newSnapshotIndex);
+      File snapshotDir = new File(storageDir, newSnapshotDirName);
       if (snapshotDir.exists()) {
         FileUtils.deleteDirectory(snapshotDir);
       }
@@ -312,13 +309,13 @@ public class IoTConsensusServerImpl {
   }
 
   public void transitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
-    File snapshotDir = new File(storageDir, latestSnapshotId);
+    File snapshotDir = new File(storageDir, newSnapshotDirName);
     List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
     logger.info("transit snapshots: {}", snapshotPaths);
     try (SyncIoTConsensusServiceClient client =
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       for (Path path : snapshotPaths) {
-        SnapshotFragmentReader reader = new SnapshotFragmentReader(latestSnapshotId, path);
+        SnapshotFragmentReader reader = new SnapshotFragmentReader(newSnapshotDirName, path);
         try {
           while (reader.hasNext()) {
             TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
@@ -370,6 +367,22 @@ public class IoTConsensusServerImpl {
     return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
   }
 
+  private long getLatestSnapshotIndex() {
+    long snapShotIndex = 0;
+    File directory = new File(storageDir);
+    File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
+    if (versionFiles == null || versionFiles.length == 0) {
+      return snapShotIndex;
+    }
+    for (File file : versionFiles) {
+      snapShotIndex =
+          Long.max(
+              snapShotIndex,
+              Long.parseLong(snapshotIndexPatten.matcher(file.getName()).replaceAll("")));
+    }
+    return snapShotIndex;
+  }
+
   private void clearOldSnapshot() {
     File directory = new File(storageDir);
     File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
@@ -379,12 +392,13 @@ public class IoTConsensusServerImpl {
           thisNode.getGroupId());
       return;
     }
-    Arrays.sort(versionFiles, Comparator.comparing(File::getName));
-    for (int i = 0; i < versionFiles.length - 1; i++) {
-      try {
-        FileUtils.deleteDirectory(versionFiles[i]);
-      } catch (IOException e) {
-        logger.error("Delete old snapshot dir {} failed", versionFiles[i].getAbsolutePath(), e);
+    for (File file : versionFiles) {
+      if (!file.getName().equals(newSnapshotDirName)) {
+        try {
+          FileUtils.deleteDirectory(file);
+        } catch (IOException e) {
+          logger.error("Delete old snapshot dir {} failed", file.getAbsolutePath(), e);
+        }
       }
     }
   }
@@ -416,7 +430,7 @@ public class IoTConsensusServerImpl {
       TTriggerSnapshotLoadRes res =
           client.triggerSnapshotLoad(
               new TTriggerSnapshotLoadReq(
-                  thisNode.getGroupId().convertToTConsensusGroupId(), latestSnapshotId));
+                  thisNode.getGroupId().convertToTConsensusGroupId(), newSnapshotDirName));
       if (!isSuccess(res.status)) {
         throw new ConsensusGroupModifyPeerException(
             String.format("error when triggering snapshot load %s. %s", peer, res.getStatus()));
@@ -744,7 +758,7 @@ public class IoTConsensusServerImpl {
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       TCleanupTransferredSnapshotReq req =
           new TCleanupTransferredSnapshotReq(
-              targetPeer.getGroupId().convertToTConsensusGroupId(), latestSnapshotId);
+              targetPeer.getGroupId().convertToTConsensusGroupId(), newSnapshotDirName);
       TCleanupTransferredSnapshotRes res = client.cleanupTransferredSnapshot(req);
       if (!isSuccess(res.getStatus())) {
         throw new ConsensusGroupModifyPeerException(
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index e7db5ad6e3..ea7c15c8f1 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.iot.util.TestStateMachine;
 
 import org.apache.ratis.util.FileUtils;
@@ -78,7 +79,13 @@ public class StabilityTest {
   }
 
   @Test
-  public void snapshotTest() throws Exception {
+  public void allTest() throws Exception {
+    peerTest();
+    snapshotTest();
+    snapshotUpgradeTest();
+  }
+
+  public void peerTest() throws Exception {
     consensusImpl.createPeer(
         dataRegionId,
         Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 9000))));
@@ -94,11 +101,15 @@ public class StabilityTest {
         consensusImpl.createPeer(
             dataRegionId,
             Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 9000))));
-
     Assert.assertTrue(response.isSuccess());
+    consensusImpl.deletePeer(dataRegionId);
+  }
 
+  public void snapshotTest() throws IOException {
+    consensusImpl.createPeer(
+        dataRegionId,
+        Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 9000))));
     consensusImpl.triggerSnapshot(dataRegionId);
-    Thread.sleep(10);
 
     File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, dataRegionId));
 
@@ -108,14 +119,48 @@ public class StabilityTest {
     Assert.assertEquals(1, versionFiles1.length);
 
     consensusImpl.triggerSnapshot(dataRegionId);
-    Thread.sleep(10);
+
     consensusImpl.triggerSnapshot(dataRegionId);
 
     File[] versionFiles2 =
         dataDir.listFiles((dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME));
     Assert.assertNotNull(versionFiles2);
     Assert.assertEquals(1, versionFiles2.length);
-
     Assert.assertNotEquals(versionFiles1[0].getName(), versionFiles2[0].getName());
+    consensusImpl.deletePeer(dataRegionId);
+  }
+
+  public void snapshotUpgradeTest() throws Exception {
+    consensusImpl.createPeer(
+        dataRegionId,
+        Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 9000))));
+    consensusImpl.triggerSnapshot(dataRegionId);
+    long oldSnapshotIndex = System.currentTimeMillis();
+    String oldSnapshotDirName =
+        String.format(
+            "%s_%s_%d",
+            IoTConsensusServerImpl.SNAPSHOT_DIR_NAME, dataRegionId.getId(), oldSnapshotIndex);
+    File regionDir = new File(storageDir, "1_1");
+    File oldSnapshotDir = new File(regionDir, oldSnapshotDirName);
+    if (oldSnapshotDir.exists()) {
+      FileUtils.deleteFully(oldSnapshotDir);
+    }
+    if (!oldSnapshotDir.mkdirs()) {
+      throw new ConsensusGroupModifyPeerException(
+          String.format("%s: cannot mkdir for snapshot", dataRegionId));
+    }
+    consensusImpl.triggerSnapshot(dataRegionId);
+    Assert.assertFalse(oldSnapshotDir.exists());
+
+    File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, dataRegionId));
+
+    File[] snapshotFiles =
+        dataDir.listFiles((dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME));
+    Assert.assertNotNull(snapshotFiles);
+    Assert.assertEquals(1, snapshotFiles.length);
+    Assert.assertEquals(
+        oldSnapshotIndex + 1,
+        Long.parseLong(snapshotFiles[0].getName().replaceAll(".*[^\\d](?=(\\d+))", "")));
+    consensusImpl.deletePeer(dataRegionId);
   }
 }