You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/02/09 02:29:39 UTC
[iotdb] branch master updated: [IOTDB-5479] Optimize the wal impl of confignode simple consensus (#8997)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dfd6d3d9fa [IOTDB-5479] Optimize the wal impl of confignode simple consensus (#8997)
dfd6d3d9fa is described below
commit dfd6d3d9faab2135be3d3d920ae78b6395b27e9c
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Thu Feb 9 10:29:32 2023 +0800
[IOTDB-5479] Optimize the wal impl of confignode simple consensus (#8997)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 10 ++++
.../confignode/conf/ConfigNodeDescriptor.java | 8 +++
.../statemachine/ConfigNodeRegionStateMachine.java | 70 +++++++++++++---------
3 files changed, 61 insertions(+), 27 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 4146fda59a..03442f59a6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -287,6 +287,8 @@ public class ConfigNodeConfig {
/** The getOrCreatePartitionTable interface will log new created Partition if set true */
private boolean isEnablePrintingNewlyCreatedPartition = false;
+ private long forceWalPeriodForConfigNodeSimpleInMs = 100;
+
public ConfigNodeConfig() {
// empty constructor
}
@@ -1100,4 +1102,12 @@ public class ConfigNodeConfig {
public void setEnablePrintingNewlyCreatedPartition(boolean enablePrintingNewlyCreatedPartition) {
isEnablePrintingNewlyCreatedPartition = enablePrintingNewlyCreatedPartition;
}
+
+ public long getForceWalPeriodForConfigNodeSimpleInMs() {
+ return forceWalPeriodForConfigNodeSimpleInMs;
+ }
+
+ public void setForceWalPeriodForConfigNodeSimpleInMs(long forceWalPeriodForConfigNodeSimpleInMs) {
+ this.forceWalPeriodForConfigNodeSimpleInMs = forceWalPeriodForConfigNodeSimpleInMs;
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 178e65afab..1bcfa488f2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -764,6 +764,14 @@ public class ConfigNodeDescriptor {
"enable_printing_newly_created_partition",
String.valueOf(conf.isEnablePrintingNewlyCreatedPartition()))
.trim()));
+
+ conf.setForceWalPeriodForConfigNodeSimpleInMs(
+ Long.parseLong(
+ properties
+ .getProperty(
+ "force_wal_period_for_confignode_simple_in_ms",
+ String.valueOf(conf.getForceWalPeriodForConfigNodeSimpleInMs()))
+ .trim()));
}
private void loadCQConfig(Properties properties) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
index 16d75de30b..97014df506 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -51,6 +52,7 @@ import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/** StateMachine for ConfigNodeRegion */
@@ -64,8 +66,11 @@ public class ConfigNodeRegionStateMachine
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private final ConfigPlanExecutor executor;
private ConfigManager configManager;
- private LogWriter logWriter;
- private File logFile;
+
+ /** Variables for ConfigNode Simple Consensus */
+ private LogWriter simpleLogWriter;
+
+ private File simpleLogFile;
private int startIndex;
private int endIndex;
@@ -269,21 +274,22 @@ public class ConfigNodeRegionStateMachine
/** TODO optimize the lock usage */
private synchronized void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
- if (logFile.length() > LOG_FILE_MAX_SIZE) {
+ if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
try {
- logWriter.force();
+ simpleLogWriter.force();
File completedFilePath = new File(FILE_PATH + startIndex + "_" + endIndex);
- Files.move(logFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE);
+ Files.move(
+ simpleLogFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus mode", e);
}
for (int retry = 0; retry < 5; retry++) {
try {
- logWriter.close();
+ simpleLogWriter.close();
} catch (IOException e) {
LOGGER.warn(
"Can't close StandAloneLog for ConfigNode SimpleConsensus mode, filePath: {}, retry: {}",
- logFile.getAbsolutePath(),
+ simpleLogFile.getAbsolutePath(),
retry);
try {
// Sleep 1s and retry
@@ -302,15 +308,10 @@ public class ConfigNodeRegionStateMachine
try {
ByteBuffer buffer = plan.serializeToByteBuffer();
- // The method logWriter.write will execute flip() firstly, so we must make position==limit
buffer.position(buffer.limit());
- logWriter.write(buffer);
- logWriter.close();
+ simpleLogWriter.write(buffer);
+
endIndex = endIndex + 1;
- File tmpLogFile = new File(PROGRESS_FILE_PATH + endIndex);
- Files.move(logFile.toPath(), tmpLogFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
- logFile = tmpLogFile;
- logWriter = new LogWriter(logFile, false);
} catch (Exception e) {
LOGGER.error(
"Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e);
@@ -323,14 +324,6 @@ public class ConfigNodeRegionStateMachine
String[] list = new File(CURRENT_FILE_DIR).list();
if (list != null && list.length != 0) {
for (String logFileName : list) {
- int tmp = Integer.parseInt(logFileName.substring(logFileName.lastIndexOf("_") + 1));
- if (logFileName.startsWith("log_inprogress")) {
- endIndex = tmp;
- } else {
- if (startIndex < tmp) {
- startIndex = tmp;
- }
- }
File logFile =
SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR + File.separator + logFileName);
SingleFileLogReader logReader;
@@ -343,7 +336,10 @@ public class ConfigNodeRegionStateMachine
e);
continue;
}
+
+ startIndex = endIndex;
while (logReader.hasNext()) {
+ endIndex++;
// read and re-serialize the PhysicalPlan
ConfigPhysicalPlan nextPlan = logReader.next();
try {
@@ -360,18 +356,38 @@ public class ConfigNodeRegionStateMachine
}
startIndex = startIndex + 1;
createLogFile(endIndex);
+
+ ScheduledExecutorService simpleConsensusThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ "ConfigNode-Simple-Consensus-WAL-Flush-Thread");
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ simpleConsensusThread,
+ this::flushWALForSimpleConsensus,
+ 0,
+ CONF.getForceWalPeriodForConfigNodeSimpleInMs(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void flushWALForSimpleConsensus() {
+ if (simpleLogWriter != null) {
+ try {
+ simpleLogWriter.force();
+ } catch (IOException e) {
+ LOGGER.error("Can't force logWriter for ConfigNode flushWALForSimpleConsensus", e);
+ }
+ }
}
private void createLogFile(int endIndex) {
- logFile = SystemFileFactory.INSTANCE.getFile(PROGRESS_FILE_PATH + endIndex);
+ simpleLogFile = SystemFileFactory.INSTANCE.getFile(PROGRESS_FILE_PATH + endIndex);
try {
- logFile.createNewFile();
- logWriter = new LogWriter(logFile, false);
- LOGGER.info("Create ConfigNode SimpleConsensusFile: {}", logFile.getAbsolutePath());
+ simpleLogFile.createNewFile();
+ simpleLogWriter = new LogWriter(simpleLogFile, false);
+ LOGGER.info("Create ConfigNode SimpleConsensusFile: {}", simpleLogFile.getAbsolutePath());
} catch (Exception e) {
LOGGER.warn(
"Create ConfigNode SimpleConsensusFile failed, filePath: {}",
- logFile.getAbsolutePath(),
+ simpleLogFile.getAbsolutePath(),
e);
}
}