You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/03 06:17:11 UTC
[iotdb] branch master updated: [IOTDB-4616] Support snapshot serialization and deserialization for confignode standalone mode (#7874)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ae83e89676 [IOTDB-4616] Support snapshot serialization and deserialization for confignode standalone mode (#7874)
ae83e89676 is described below
commit ae83e89676f8406febdd345d981541dfbfc93ef5
Author: ljn55966005 <32...@users.noreply.github.com>
AuthorDate: Thu Nov 3 14:17:05 2022 +0800
[IOTDB-4616] Support snapshot serialization and deserialization for confignode standalone mode (#7874)
---
.../statemachine/PartitionRegionStateMachine.java | 128 ++++++++++++++++-----
.../iotdb/confignode/manager/ConsensusManager.java | 1 -
2 files changed, 99 insertions(+), 30 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 4fccc0d2e4..d8187b56ee 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeExc
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
import org.apache.iotdb.confignode.writelog.io.SingleFileLogReader;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -45,6 +46,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -61,10 +65,15 @@ public class PartitionRegionStateMachine
private ConfigManager configManager;
private LogWriter logWriter;
private File logFile;
- private int logFileId;
- private static final String fileDir = CONF.getConsensusDir();
- private static final String filePath = fileDir + File.separator + "log_inprogress_";
- private static final long FILE_MAX_SIZE = CONF.getPartitionRegionOneCopyLogSegmentSizeMax();
+ private int startIndex;
+ private int endIndex;
+
+ private static final String currentFileDir =
+ CONF.getConsensusDir() + File.separator + "standalone" + File.separator + "current";
+ private static final String progressFilePath =
+ currentFileDir + File.separator + "log_inprogress_";
+ private static final String filePath = currentFileDir + File.separator + "log_";
+ private static final long LOG_FILE_MAX_SIZE = CONF.getPartitionRegionOneCopyLogSegmentSizeMax();
private final TEndPoint currentNodeTEndPoint;
public PartitionRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) {
@@ -120,6 +129,9 @@ public class PartitionRegionStateMachine
result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
+ if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+ writeLogForSimpleConsensus(plan);
+ }
return result;
}
@@ -212,7 +224,11 @@ public class PartitionRegionStateMachine
}
@Override
- public void start() {}
+ public void start() {
+ if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+ initStandAloneConfigNode();
+ }
+ }
@Override
public void stop() {
@@ -242,17 +258,77 @@ public class PartitionRegionStateMachine
return RetryPolicy.super.getSleepTime();
}
+ /** TODO optimize the lock usage */
+ private synchronized void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
+ if (logFile.length() > LOG_FILE_MAX_SIZE) {
+ try {
+ logWriter.force();
+ File completedFilePath = new File(filePath + startIndex + "_" + endIndex);
+ Files.move(logFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE);
+ } catch (IOException e) {
+ LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus mode", e);
+ }
+ for (int retry = 0; retry < 5; retry++) {
+ try {
+ logWriter.close();
+ } catch (IOException e) {
+ LOGGER.warn(
+ "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, filePath: {}, retry: {}",
+ logFile.getAbsolutePath(),
+ retry);
+ try {
+ // Sleep 1s and retry
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Unexpected interruption during the close method of logWriter");
+ }
+ continue;
+ }
+ break;
+ }
+ startIndex = endIndex + 1;
+ createLogFile(startIndex);
+ }
+
+ try {
+ ByteBuffer buffer = plan.serializeToByteBuffer();
+ // The method logWriter.write will execute flip() firstly, so we must make position==limit
+ buffer.position(buffer.limit());
+ logWriter.write(buffer);
+ endIndex = endIndex + 1;
+ File tmpLogFile = new File(progressFilePath + endIndex);
+ Files.move(logFile.toPath(), tmpLogFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
+ logFile = tmpLogFile;
+ logWriter = new LogWriter(logFile, false);
+ } catch (Exception e) {
+ LOGGER.error(
+ "Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e);
+ }
+ }
+
private void initStandAloneConfigNode() {
- String[] list = new File(fileDir).list();
+ File dir = new File(currentFileDir);
+ dir.mkdirs();
+ String[] list = new File(currentFileDir).list();
if (list != null && list.length != 0) {
for (String logFileName : list) {
- File logFile = SystemFileFactory.INSTANCE.getFile(fileDir + File.separator + logFileName);
+ int tmp = Integer.parseInt(logFileName.substring(logFileName.lastIndexOf("_") + 1));
+ if (logFileName.startsWith("log_inprogress")) {
+ endIndex = tmp;
+ } else {
+ if (startIndex < tmp) {
+ startIndex = tmp;
+ }
+ }
+ File logFile =
+ SystemFileFactory.INSTANCE.getFile(currentFileDir + File.separator + logFileName);
SingleFileLogReader logReader;
try {
logReader = new SingleFileLogReader(logFile);
} catch (FileNotFoundException e) {
LOGGER.error(
- "initStandAloneConfigNode meets error, can't find standalone log files, filePath: {}",
+ "InitStandAloneConfigNode meets error, can't find standalone log files, filePath: {}",
logFile.getAbsolutePath(),
e);
continue;
@@ -268,31 +344,25 @@ public class PartitionRegionStateMachine
}
logReader.close();
}
+ } else {
+ startIndex = 0;
+ endIndex = 0;
}
- for (int ID = 0; ID < Integer.MAX_VALUE; ID++) {
- File file = SystemFileFactory.INSTANCE.getFile(filePath);
- if (!file.exists()) {
- logFileId = ID;
- break;
- }
- }
- createLogFile(logFileId);
+ startIndex = startIndex + 1;
+ createLogFile(endIndex);
}
- private void createLogFile(int logFileId) {
- logFile = SystemFileFactory.INSTANCE.getFile(filePath + logFileId);
+ private void createLogFile(int endIndex) {
+ logFile = SystemFileFactory.INSTANCE.getFile(progressFilePath + endIndex);
try {
- if (logFile.createNewFile()) {
- logWriter = new LogWriter(logFile, false);
- LOGGER.info("Create StandaloneLog: {}", logFile.getAbsolutePath());
- }
- } catch (IOException e) {
- LOGGER.warn("Can't create StandaloneLog: {}, retrying...", logFile.getAbsolutePath());
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException ignored) {
- // Ignore and retry
- }
+ logFile.createNewFile();
+ logWriter = new LogWriter(logFile, false);
+ LOGGER.info("Create ConfigNode SimpleConsensusFile: {}", logFile.getAbsolutePath());
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Create ConfigNode SimpleConsensusFile failed, filePath: {}",
+ logFile.getAbsolutePath(),
+ e);
}
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 3aa494e078..9e86e91825 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -179,7 +179,6 @@ public class ConsensusManager {
CONF.getConfigNodeConsensusProtocolClass())));
}
consensusImpl.start();
-
if (SystemPropertiesUtils.isRestarted()) {
try {
// Create ConsensusGroup from confignode-system.properties file when restart