You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/12/15 02:57:45 UTC
[iotdb] 01/01: [IOTDB-5174] Use filename format such as NodeID-Index rather than Endpoint-Index to track follower sync progress (#8458)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch jira5174
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a73a26ef0eec35a50ee76cad0b385c9c4a8992b0
Author: Potato <ta...@apache.org>
AuthorDate: Thu Dec 15 10:55:48 2022 +0800
[IOTDB-5174] Use filename format such as NodeID-Index rather than Endpoint-Index to track follower sync progress (#8458)
* change version filename && add compatibility solution/test
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
* fix review
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
* rename to uppercase
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
Co-authored-by: Jinrui.Zhang <xi...@gmail.com>
---
.../iot/logdispatcher/IndexController.java | 43 ++++++++++++++++++++--
.../consensus/iot/logdispatcher/LogDispatcher.java | 3 +-
.../iot/logdispatcher/IndexControllerTest.java | 37 +++++++++++++++----
.../iot/logdispatcher/SyncStatusTest.java | 15 +++++---
4 files changed, 81 insertions(+), 17 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index e5242c6611..7630849928 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.ratis.Utils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -30,12 +32,16 @@ import javax.annotation.concurrent.ThreadSafe;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** An index controller class to balance the performance degradation of frequent disk I/O. */
@ThreadSafe
public class IndexController {
+ public static final String SEPARATOR = "-";
+
private final Logger logger = LoggerFactory.getLogger(IndexController.class);
private long lastFlushedIndex;
@@ -44,16 +50,23 @@ public class IndexController {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final String storageDir;
+
+ private final Peer peer;
private final String prefix;
private final long initialIndex;
private final long checkpointGap;
- public IndexController(String storageDir, String prefix, long initialIndex, long checkpointGap) {
+ public IndexController(String storageDir, Peer peer, long initialIndex, long checkpointGap) {
this.storageDir = storageDir;
- this.prefix = prefix + '-';
+ this.peer = peer;
+ this.prefix = peer.getNodeId() + SEPARATOR;
this.checkpointGap = checkpointGap;
this.initialIndex = initialIndex;
+ // This is because we changed the name of the version file in version 1.0.1. In order to ensure
+ // compatibility with version 1.0.0, we need to add this function. We will remove this function
+ // in the future version 2.x.
+ upgrade();
restore();
}
@@ -124,6 +137,30 @@ public class IndexController {
}
}
+ private void upgrade() {
+ File directory = new File(storageDir);
+ String oldPrefix = Utils.fromTEndPointToString(peer.getEndpoint()) + SEPARATOR;
+ Optional.ofNullable(directory.listFiles((dir, name) -> name.startsWith(oldPrefix)))
+ .ifPresent(
+ files ->
+ Arrays.stream(files)
+ .forEach(
+ oldFile -> {
+ String[] splits = oldFile.getName().split(SEPARATOR);
+ long fileVersion = Long.parseLong(splits[splits.length - 1]);
+ File newFile = new File(storageDir, prefix + fileVersion);
+ try {
+ logger.info(
+ "version file upgrade, previous: {}, current: {}",
+ oldFile.getAbsolutePath(),
+ newFile.getAbsolutePath());
+ FileUtils.moveFile(oldFile, newFile);
+ } catch (IOException e) {
+ logger.error("Error occurred when upgrading version file", e);
+ }
+ }));
+ }
+
private void restore() {
File directory = new File(storageDir);
File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(prefix));
@@ -132,7 +169,7 @@ public class IndexController {
long maxVersion = 0;
int maxVersionIndex = 0;
for (int i = 0; i < versionFiles.length; i++) {
- long fileVersion = Long.parseLong(versionFiles[i].getName().split("-")[1]);
+ long fileVersion = Long.parseLong(versionFiles[i].getName().split(SEPARATOR)[1]);
if (fileVersion > maxVersion) {
maxVersion = fileVersion;
maxVersionIndex = i;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index c6eb05aa0a..c89e9f50fb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
import org.apache.iotdb.consensus.iot.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
-import org.apache.iotdb.consensus.ratis.Utils;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.thrift.TException;
@@ -204,7 +203,7 @@ public class LogDispatcher {
this.controller =
new IndexController(
impl.getStorageDir(),
- Utils.fromTEndPointToString(peer.getEndpoint()),
+ peer,
initialSyncIndex,
config.getReplication().getCheckpointGap());
this.syncStatus = new SyncStatus(controller, config);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index 183c35e2ee..7051f47930 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -19,6 +19,11 @@
package org.apache.iotdb.consensus.iot.logdispatcher;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.ratis.Utils;
+
import org.apache.ratis.util.FileUtils;
import org.junit.After;
import org.junit.Assert;
@@ -27,11 +32,14 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
public class IndexControllerTest {
private static final File storageDir = new File("target" + java.io.File.separator + "test");
- private static final String prefix = "version";
+
+ private static final Peer peer =
+ new Peer(new DataRegionId(1), 2, new TEndPoint("datanode-1.datanode-svc", 6667));
private static final long CHECK_POINT_GAP = 500;
@@ -45,11 +53,10 @@ public class IndexControllerTest {
FileUtils.deleteFully(storageDir);
}
- /** test indexController when incrementIntervalAfterRestart == false */
@Test
- public void testIncrementIntervalAfterRestart() {
+ public void testRestart() {
IndexController controller =
- new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+ new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -58,7 +65,7 @@ public class IndexControllerTest {
Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+ controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -66,7 +73,7 @@ public class IndexControllerTest {
Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+ controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
@@ -74,7 +81,7 @@ public class IndexControllerTest {
Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
- controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+ controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
@@ -82,4 +89,20 @@ public class IndexControllerTest {
Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
}
+
+ @Test
+ public void testUpgrade() throws IOException {
+ File oldFile =
+ new File(
+ storageDir,
+ Utils.fromTEndPointToString(peer.getEndpoint()) + IndexController.SEPARATOR + 100);
+ Files.createFile(oldFile.toPath());
+
+ IndexController controller =
+ new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
+ Assert.assertEquals(100, controller.getCurrentIndex());
+
+ File newFile = new File(storageDir, peer.getNodeId() + IndexController.SEPARATOR + 100);
+ Assert.assertTrue(newFile.exists());
+ }
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
index b0a85612f7..f9c78622db 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.consensus.iot.logdispatcher;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.iot.thrift.TLogBatch;
@@ -38,7 +41,9 @@ import java.util.concurrent.ExecutionException;
public class SyncStatusTest {
private static final File storageDir = new File("target" + java.io.File.separator + "test");
- private static final String prefix = "version";
+
+ private static final Peer peer =
+ new Peer(new DataRegionId(1), 2, new TEndPoint("127.0.0.1", 6667));
private static final IoTConsensusConfig config = new IoTConsensusConfig.Builder().build();
private static final long CHECK_POINT_GAP = 500;
@@ -56,7 +61,7 @@ public class SyncStatusTest {
@Test
public void sequenceTest() throws InterruptedException {
IndexController controller =
- new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+ new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
SyncStatus status = new SyncStatus(controller, config);
@@ -86,7 +91,7 @@ public class SyncStatusTest {
@Test
public void reverseTest() throws InterruptedException {
IndexController controller =
- new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+ new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -123,7 +128,7 @@ public class SyncStatusTest {
@Test
public void mixedTest() throws InterruptedException {
IndexController controller =
- new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+ new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -172,7 +177,7 @@ public class SyncStatusTest {
@Test
public void waitTest() throws InterruptedException, ExecutionException {
IndexController controller =
- new IndexController(storageDir.getAbsolutePath(), prefix, 0, CHECK_POINT_GAP);
+ new IndexController(storageDir.getAbsolutePath(), peer, 0, CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
SyncStatus status = new SyncStatus(controller, config);