You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/02/09 02:29:39 UTC

[iotdb] branch master updated: [IOTDB-5479] Optimize the wal impl of confignode simple consensus (#8997)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dfd6d3d9fa [IOTDB-5479] Optimize the wal impl of confignode simple consensus (#8997)
dfd6d3d9fa is described below

commit dfd6d3d9faab2135be3d3d920ae78b6395b27e9c
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Thu Feb 9 10:29:32 2023 +0800

    [IOTDB-5479] Optimize the wal impl of confignode simple consensus (#8997)
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    | 10 ++++
 .../confignode/conf/ConfigNodeDescriptor.java      |  8 +++
 .../statemachine/ConfigNodeRegionStateMachine.java | 70 +++++++++++++---------
 3 files changed, 61 insertions(+), 27 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 4146fda59a..03442f59a6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -287,6 +287,8 @@ public class ConfigNodeConfig {
   /** The getOrCreatePartitionTable interface will log new created Partition if set true */
   private boolean isEnablePrintingNewlyCreatedPartition = false;
 
+  private long forceWalPeriodForConfigNodeSimpleInMs = 100;
+
   public ConfigNodeConfig() {
     // empty constructor
   }
@@ -1100,4 +1102,12 @@ public class ConfigNodeConfig {
   public void setEnablePrintingNewlyCreatedPartition(boolean enablePrintingNewlyCreatedPartition) {
     isEnablePrintingNewlyCreatedPartition = enablePrintingNewlyCreatedPartition;
   }
+
+  public long getForceWalPeriodForConfigNodeSimpleInMs() {
+    return forceWalPeriodForConfigNodeSimpleInMs;
+  }
+
+  public void setForceWalPeriodForConfigNodeSimpleInMs(long forceWalPeriodForConfigNodeSimpleInMs) {
+    this.forceWalPeriodForConfigNodeSimpleInMs = forceWalPeriodForConfigNodeSimpleInMs;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 178e65afab..1bcfa488f2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -764,6 +764,14 @@ public class ConfigNodeDescriptor {
                     "enable_printing_newly_created_partition",
                     String.valueOf(conf.isEnablePrintingNewlyCreatedPartition()))
                 .trim()));
+
+    conf.setForceWalPeriodForConfigNodeSimpleInMs(
+        Long.parseLong(
+            properties
+                .getProperty(
+                    "force_wal_period_for_confignode_simple_in_ms",
+                    String.valueOf(conf.getForceWalPeriodForConfigNodeSimpleInMs()))
+                .trim()));
   }
 
   private void loadCQConfig(Properties properties) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
index 16d75de30b..97014df506 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigNodeRegionStateMachine.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -51,6 +52,7 @@ import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /** StateMachine for ConfigNodeRegion */
@@ -64,8 +66,11 @@ public class ConfigNodeRegionStateMachine
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
   private final ConfigPlanExecutor executor;
   private ConfigManager configManager;
-  private LogWriter logWriter;
-  private File logFile;
+
+  /** Variables for ConfigNode Simple Consensus */
+  private LogWriter simpleLogWriter;
+
+  private File simpleLogFile;
   private int startIndex;
   private int endIndex;
 
@@ -269,21 +274,22 @@ public class ConfigNodeRegionStateMachine
 
   /** TODO optimize the lock usage */
   private synchronized void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
-    if (logFile.length() > LOG_FILE_MAX_SIZE) {
+    if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
       try {
-        logWriter.force();
+        simpleLogWriter.force();
         File completedFilePath = new File(FILE_PATH + startIndex + "_" + endIndex);
-        Files.move(logFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE);
+        Files.move(
+            simpleLogFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE);
       } catch (IOException e) {
         LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus mode", e);
       }
       for (int retry = 0; retry < 5; retry++) {
         try {
-          logWriter.close();
+          simpleLogWriter.close();
         } catch (IOException e) {
           LOGGER.warn(
               "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, filePath: {}, retry: {}",
-              logFile.getAbsolutePath(),
+              simpleLogFile.getAbsolutePath(),
               retry);
           try {
             // Sleep 1s and retry
@@ -302,15 +308,10 @@ public class ConfigNodeRegionStateMachine
 
     try {
       ByteBuffer buffer = plan.serializeToByteBuffer();
-      // The method logWriter.write will execute flip() firstly, so we must make position==limit
       buffer.position(buffer.limit());
-      logWriter.write(buffer);
-      logWriter.close();
+      simpleLogWriter.write(buffer);
+
       endIndex = endIndex + 1;
-      File tmpLogFile = new File(PROGRESS_FILE_PATH + endIndex);
-      Files.move(logFile.toPath(), tmpLogFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
-      logFile = tmpLogFile;
-      logWriter = new LogWriter(logFile, false);
     } catch (Exception e) {
       LOGGER.error(
           "Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e);
@@ -323,14 +324,6 @@ public class ConfigNodeRegionStateMachine
     String[] list = new File(CURRENT_FILE_DIR).list();
     if (list != null && list.length != 0) {
       for (String logFileName : list) {
-        int tmp = Integer.parseInt(logFileName.substring(logFileName.lastIndexOf("_") + 1));
-        if (logFileName.startsWith("log_inprogress")) {
-          endIndex = tmp;
-        } else {
-          if (startIndex < tmp) {
-            startIndex = tmp;
-          }
-        }
         File logFile =
             SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR + File.separator + logFileName);
         SingleFileLogReader logReader;
@@ -343,7 +336,10 @@ public class ConfigNodeRegionStateMachine
               e);
           continue;
         }
+
+        startIndex = endIndex;
         while (logReader.hasNext()) {
+          endIndex++;
           // read and re-serialize the PhysicalPlan
           ConfigPhysicalPlan nextPlan = logReader.next();
           try {
@@ -360,18 +356,38 @@ public class ConfigNodeRegionStateMachine
     }
     startIndex = startIndex + 1;
     createLogFile(endIndex);
+
+    ScheduledExecutorService simpleConsensusThread =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            "ConfigNode-Simple-Consensus-WAL-Flush-Thread");
+    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+        simpleConsensusThread,
+        this::flushWALForSimpleConsensus,
+        0,
+        CONF.getForceWalPeriodForConfigNodeSimpleInMs(),
+        TimeUnit.MILLISECONDS);
+  }
+
+  private void flushWALForSimpleConsensus() {
+    if (simpleLogWriter != null) {
+      try {
+        simpleLogWriter.force();
+      } catch (IOException e) {
+        LOGGER.error("Can't force logWriter for ConfigNode flushWALForSimpleConsensus", e);
+      }
+    }
   }
 
   private void createLogFile(int endIndex) {
-    logFile = SystemFileFactory.INSTANCE.getFile(PROGRESS_FILE_PATH + endIndex);
+    simpleLogFile = SystemFileFactory.INSTANCE.getFile(PROGRESS_FILE_PATH + endIndex);
     try {
-      logFile.createNewFile();
-      logWriter = new LogWriter(logFile, false);
-      LOGGER.info("Create ConfigNode SimpleConsensusFile: {}", logFile.getAbsolutePath());
+      simpleLogFile.createNewFile();
+      simpleLogWriter = new LogWriter(simpleLogFile, false);
+      LOGGER.info("Create ConfigNode SimpleConsensusFile: {}", simpleLogFile.getAbsolutePath());
     } catch (Exception e) {
       LOGGER.warn(
           "Create ConfigNode SimpleConsensusFile failed, filePath: {}",
-          logFile.getAbsolutePath(),
+          simpleLogFile.getAbsolutePath(),
           e);
     }
   }