You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/12/15 02:57:44 UTC

[iotdb] branch jira5174 created (now a73a26ef0e)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira5174
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at a73a26ef0e [IOTDB-5174] Use filename format such as NodeID-Index rather than Endpoint-Index to track follower sync progress (#8458)

This branch includes the following new commits:

     new a73a26ef0e [IOTDB-5174] Use filename format such as NodeID-Index rather than Endpoint-Index to track follower sync progress (#8458)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5174] Use filename format such as NodeID-Index rather than Endpoint-Index to track follower sync progress (#8458)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5174
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a73a26ef0eec35a50ee76cad0b385c9c4a8992b0
Author: Potato <ta...@apache.org>
AuthorDate: Thu Dec 15 10:55:48 2022 +0800

    [IOTDB-5174] Use filename format such as NodeID-Index rather than Endpoint-Index to track follower sync progress (#8458)
    
    * change version filename && add compatibility solution/test
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    * fix review
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    * rename to uppercase
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    Co-authored-by: Jinrui.Zhang <xi...@gmail.com>
---
 .../iot/logdispatcher/IndexController.java         | 43 ++++++++++++++++++++--
 .../consensus/iot/logdispatcher/LogDispatcher.java |  3 +-
 .../iot/logdispatcher/IndexControllerTest.java     | 37 +++++++++++++++----
 .../iot/logdispatcher/SyncStatusTest.java          | 15 +++++---
 4 files changed, 81 insertions(+), 17 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index e5242c6611..7630849928 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.ratis.Utils;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -30,12 +32,16 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Optional;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /** An index controller class to balance the performance degradation of frequent disk I/O. */
 @ThreadSafe
 public class IndexController {
 
+  public static final String SEPARATOR = "-";
+
   private final Logger logger = LoggerFactory.getLogger(IndexController.class);
 
   private long lastFlushedIndex;
@@ -44,16 +50,23 @@ public class IndexController {
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   private final String storageDir;
+
+  private final Peer peer;
   private final String prefix;
   private final long initialIndex;
 
   private final long checkpointGap;
 
-  public IndexController(String storageDir, String prefix, long initialIndex, long checkpointGap) {
+  public IndexController(String storageDir, Peer peer, long initialIndex, long checkpointGap) {
     this.storageDir = storageDir;
-    this.prefix = prefix + '-';
+    this.peer = peer;
+    this.prefix = peer.getNodeId() + SEPARATOR;
     this.checkpointGap = checkpointGap;
     this.initialIndex = initialIndex;
+    // This is because we changed the name of the version file in version 1.0.1. In order to ensure
+    // compatibility with version 1.0.0, we need to add this function. We will remove this function
+    // in the future version 2.x.
+    upgrade();
     restore();
   }
 
@@ -124,6 +137,30 @@ public class IndexController {
     }
   }
 
+  private void upgrade() {
+    File directory = new File(storageDir);
+    String oldPrefix = Utils.fromTEndPointToString(peer.getEndpoint()) + SEPARATOR;
+    Optional.ofNullable(directory.listFiles((dir, name) -> name.startsWith(oldPrefix)))
+        .ifPresent(
+            files ->
+                Arrays.stream(files)
+                    .forEach(
+                        oldFile -> {
+                          String[] splits = oldFile.getName().split(SEPARATOR);
+                          long fileVersion = Long.parseLong(splits[splits.length - 1]);
+                          File newFile = new File(storageDir, prefix + fileVersion);
+                          try {
+                            logger.info(
+                                "version file upgrade, previous: {}, current: {}",
+                                oldFile.getAbsolutePath(),
+                                newFile.getAbsolutePath());
+                            FileUtils.moveFile(oldFile, newFile);
+                          } catch (IOException e) {
+                            logger.error("Error occurred when upgrading version file", e);
+                          }
+                        }));
+  }
+
   private void restore() {
     File directory = new File(storageDir);
     File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(prefix));
@@ -132,7 +169,7 @@ public class IndexController {
       long maxVersion = 0;
       int maxVersionIndex = 0;
       for (int i = 0; i < versionFiles.length; i++) {
-        long fileVersion = Long.parseLong(versionFiles[i].getName().split("-")[1]);
+        long fileVersion = Long.parseLong(versionFiles[i].getName().split(SEPARATOR)[1]);
         if (fileVersion > maxVersion) {
           maxVersion = fileVersion;
           maxVersionIndex = i;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index c6eb05aa0a..c89e9f50fb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
 import org.apache.iotdb.consensus.iot.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
-import org.apache.iotdb.consensus.ratis.Utils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import org.apache.thrift.TException;
@@ -204,7 +203,7 @@ public class LogDispatcher {
       this.controller =
           new IndexController(
               impl.getStorageDir(),
-              Utils.fromTEndPointToString(peer.getEndpoint()),
+              peer,
               initialSyncIndex,
               config.getReplication().getCheckpointGap());
       this.syncStatus = new SyncStatus(controller, config);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index 183c35e2ee..7051f47930 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.ratis.Utils;
+
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -27,11 +32,14 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 
 public class IndexControllerTest {
 
   private static final File storageDir = new File("target" + java.io.File.separator + "test");
-  private static final String prefix = "version";
+
+  private static final Peer peer =
+      new Peer(new DataRegionId(1), 2, new TEndPoint("datanode-1.datanode-svc", 6667));
 
   private static final long CHECK_POINT_GAP = 500;
 
@@ -45,11 +53,10 @@ public class IndexControllerTest {
     FileUtils.deleteFully(storageDir);
   }
 
-  /** test indexController when incrementIntervalAfterRestart == false */
   @Test
-  public void testIncrementIntervalAfterRestart() {
+  public void testRestart() {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -58,7 +65,7 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -66,7 +73,7 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
@@ -74,7 +81,7 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+    controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
 
@@ -82,4 +89,20 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
   }
+
+  @Test
+  public void testUpgrade() throws IOException {
+    File oldFile =
+        new File(
+            storageDir,
+            Utils.fromTEndPointToString(peer.getEndpoint()) + IndexController.SEPARATOR + 100);
+    Files.createFile(oldFile.toPath());
+
+    IndexController controller =
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
+    Assert.assertEquals(100, controller.getCurrentIndex());
+
+    File newFile = new File(storageDir, peer.getNodeId() + IndexController.SEPARATOR + 100);
+    Assert.assertTrue(newFile.exists());
+  }
 }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
index b0a85612f7..f9c78622db 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
 
@@ -38,7 +41,9 @@ import java.util.concurrent.ExecutionException;
 public class SyncStatusTest {
 
   private static final File storageDir = new File("target" + java.io.File.separator + "test");
-  private static final String prefix = "version";
+
+  private static final Peer peer =
+      new Peer(new DataRegionId(1), 2, new TEndPoint("127.0.0.1", 6667));
   private static final IoTConsensusConfig config = new IoTConsensusConfig.Builder().build();
   private static final long CHECK_POINT_GAP = 500;
 
@@ -56,7 +61,7 @@ public class SyncStatusTest {
   @Test
   public void sequenceTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);
@@ -86,7 +91,7 @@ public class SyncStatusTest {
   @Test
   public void reverseTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -123,7 +128,7 @@ public class SyncStatusTest {
   @Test
   public void mixedTest() throws InterruptedException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -172,7 +177,7 @@ public class SyncStatusTest {
   @Test
   public void waitTest() throws InterruptedException, ExecutionException {
     IndexController controller =
-        new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+        new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);