You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/12/15 02:55:54 UTC

[iotdb] branch master updated: [IOTDB-5174] Use filename format such as NodeID-Index rather than Endpoint-Index to track follower sync progress (#8458)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d17d1e80d3 [IOTDB-5174] Use filename format such as NodeID-Index rather than Endpoint-Index to track follower sync progress (#8458)
d17d1e80d3 is described below

commit d17d1e80d3fed83c3ae3cc86ebe70cf7d1bc1580
Author: Potato <ta...@apache.org>
AuthorDate: Thu Dec 15 10:55:48 2022 +0800

    [IOTDB-5174] Use filename format such as NodeID-Index rather than Endpoint-Index to track follower sync progress (#8458)
    
    * change version filename && add compatibility solution/test
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    * fix review
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    * rename to uppercase
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    Co-authored-by: Jinrui.Zhang <xi...@gmail.com>
---
 .../iot/logdispatcher/IndexController.java         | 43 ++++++++++++++++++++--
 .../consensus/iot/logdispatcher/LogDispatcher.java |  3 +-
 .../iot/logdispatcher/IndexControllerTest.java     | 37 +++++++++++++++----
 .../iot/logdispatcher/SyncStatusTest.java          | 15 +++++---
 4 files changed, 81 insertions(+), 17 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index e5242c6611..7630849928 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.ratis.Utils;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -30,12 +32,16 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Optional;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /** An index controller class to balance the performance degradation of frequent disk I/O. */
 @ThreadSafe
 public class IndexController {
 
+  public static final String SEPARATOR = "-";
+
   private final Logger logger = LoggerFactory.getLogger(IndexController.class);
 
   private long lastFlushedIndex;
@@ -44,16 +50,23 @@ public class IndexController {
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   private final String storageDir;
+
+  private final Peer peer;
   private final String prefix;
   private final long initialIndex;
 
   private final long checkpointGap;
 
-  public IndexController(String storageDir, String prefix, long initialIndex, long checkpointGap) {
+  public IndexController(String storageDir, Peer peer, long initialIndex, long checkpointGap) {
     this.storageDir = storageDir;
-    this.prefix = prefix + '-';
+    this.peer = peer;
+    this.prefix = peer.getNodeId() + SEPARATOR;
     this.checkpointGap = checkpointGap;
     this.initialIndex = initialIndex;
+    // This is because we changed the name of the version file in version 1.0.1. In order to ensure
+    // compatibility with version 1.0.0, we need to add this function. We will remove this function
+    // in the future version 2.x.
+    upgrade();
     restore();
   }
 
@@ -124,6 +137,30 @@ public class IndexController {
     }
   }
 
+  private void upgrade() {
+    File directory = new File(storageDir);
+    String oldPrefix = Utils.fromTEndPointToString(peer.getEndpoint()) + SEPARATOR;
+    Optional.ofNullable(directory.listFiles((dir, name) -> name.startsWith(oldPrefix)))
+        .ifPresent(
+            files ->
+                Arrays.stream(files)
+                    .forEach(
+                        oldFile -> {
+                          String[] splits = oldFile.getName().split(SEPARATOR);
+                          long fileVersion = Long.parseLong(splits[splits.length - 1]);
+                          File newFile = new File(storageDir, prefix + fileVersion);
+                          try {
+                            logger.info(
+                                "version file upgrade, previous: {}, current: {}",
+                                oldFile.getAbsolutePath(),
+                                newFile.getAbsolutePath());
+                            FileUtils.moveFile(oldFile, newFile);
+                          } catch (IOException e) {
+                            logger.error("Error occurred when upgrading version file", e);
+                          }
+                        }));
+  }
+
   private void restore() {
     File directory = new File(storageDir);
     File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(prefix));
@@ -132,7 +169,7 @@ public class IndexController {
       long maxVersion = 0;
       int maxVersionIndex = 0;
       for (int i = 0; i < versionFiles.length; i++) {
-        long fileVersion = Long.parseLong(versionFiles[i].getName().split("-")[1]);
+        long fileVersion = Long.parseLong(versionFiles[i].getName().split(SEPARATOR)[1]);
         if (fileVersion > maxVersion) {
           maxVersion = fileVersion;
           maxVersionIndex = i;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index c6eb05aa0a..c89e9f50fb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
 import org.apache.iotdb.consensus.iot.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
-import org.apache.iotdb.consensus.ratis.Utils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import org.apache.thrift.TException;
@@ -204,7 +203,7 @@ public class LogDispatcher {
       this.controller =
           new IndexController(
               impl.getStorageDir(),
-              Utils.fromTEndPointToString(peer.getEndpoint()),
+              peer,
               initialSyncIndex,
               config.getReplication().getCheckpointGap());
       this.syncStatus = new SyncStatus(controller, config);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index 183c35e2ee..7051f47930 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.ratis.Utils;
+
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -27,11 +32,14 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 
 public class IndexControllerTest {
 
   private static final File storageDir = new File("target" + java.io.File.separator + "test");
-  private static final String prefix = "version";
+
+  private static final Peer peer =
+      new Peer(new DataRegionId(1), 2, new TEndPoint("datanode-1.datanode-svc", 6667));
 
   private static final long CHECK_POINT_GAP = 500;
 
@@ -45,11 +53,10 @@ public class IndexControllerTest {
     FileUtils.deleteFully(storageDir);
   }
 
-  /** test indexController when incrementIntervalAfterRestart == false */
   @Test
-  public void testIncrementIntervalAfterRestart() {
+  public void testRestart() {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -58,7 +65,7 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -66,7 +73,7 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
@@ -74,7 +81,7 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
@@ -82,4 +89,20 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
   }
+
+  @Test
+  public void testUpgrade() throws IOException {
+    File oldFile =
+        new File(
+            storageDir,
+            Utils.fromTEndPointToString(peer.getEndpoint()) + IndexController.SEPARATOR + 100);
+    Files.createFile(oldFile.toPath());
+
+    IndexController controller =
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
+    Assert.assertEquals(100, controller.getCurrentIndex());
+
+    File newFile = new File(storageDir, peer.getNodeId() + IndexController.SEPARATOR + 100);
+    Assert.assertTrue(newFile.exists());
+  }
 }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
index b0a85612f7..f9c78622db 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
 
@@ -38,7 +41,9 @@ import java.util.concurrent.ExecutionException;
 public class SyncStatusTest {
 
   private static final File storageDir = new File("target" + java.io.File.separator + "test");
-  private static final String prefix = "version";
+
+  private static final Peer peer =
+      new Peer(new DataRegionId(1), 2, new TEndPoint("127.0.0.1", 6667));
   private static final IoTConsensusConfig config = new IoTConsensusConfig.Builder().build();
   private static final long CHECK_POINT_GAP = 500;
 
@@ -56,7 +61,7 @@ public class SyncStatusTest {
   @Test
   public void sequenceTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);
@@ -86,7 +91,7 @@ public class SyncStatusTest {
   @Test
   public void reverseTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -123,7 +128,7 @@ public class SyncStatusTest {
   @Test
   public void mixedTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -172,7 +177,7 @@ public class SyncStatusTest {
   @Test
   public void waitTest() throws InterruptedException, ExecutionException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);