You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/03 06:17:11 UTC

[iotdb] branch master updated: [IOTDB-4616] Support snapshot serialization and deserialization for confignode standalone mode (#7874)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ae83e89676 [IOTDB-4616] Support snapshot serialization and deserialization for confignode standalone mode (#7874)
ae83e89676 is described below

commit ae83e89676f8406febdd345d981541dfbfc93ef5
Author: ljn55966005 <32...@users.noreply.github.com>
AuthorDate: Thu Nov 3 14:17:05 2022 +0800

    [IOTDB-4616] Support snapshot serialization and deserialization for confignode standalone mode (#7874)
---
 .../statemachine/PartitionRegionStateMachine.java  | 128 ++++++++++++++++-----
 .../iotdb/confignode/manager/ConsensusManager.java |   1 -
 2 files changed, 99 insertions(+), 30 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 4fccc0d2e4..d8187b56ee 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeExc
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
 import org.apache.iotdb.confignode.writelog.io.SingleFileLogReader;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -45,6 +46,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -61,10 +65,15 @@ public class PartitionRegionStateMachine
   private ConfigManager configManager;
   private LogWriter logWriter;
   private File logFile;
-  private int logFileId;
-  private static final String fileDir = CONF.getConsensusDir();
-  private static final String filePath = fileDir + File.separator + "log_inprogress_";
-  private static final long FILE_MAX_SIZE = CONF.getPartitionRegionOneCopyLogSegmentSizeMax();
+  private int startIndex;
+  private int endIndex;
+
+  private static final String currentFileDir =
+      CONF.getConsensusDir() + File.separator + "standalone" + File.separator + "current";
+  private static final String progressFilePath =
+      currentFileDir + File.separator + "log_inprogress_";
+  private static final String filePath = currentFileDir + File.separator + "log_";
+  private static final long LOG_FILE_MAX_SIZE = CONF.getPartitionRegionOneCopyLogSegmentSizeMax();
   private final TEndPoint currentNodeTEndPoint;
 
   public PartitionRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) {
@@ -120,6 +129,9 @@ public class PartitionRegionStateMachine
       result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
 
+    if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+      writeLogForSimpleConsensus(plan);
+    }
     return result;
   }
 
@@ -212,7 +224,11 @@ public class PartitionRegionStateMachine
   }
 
   @Override
-  public void start() {}
+  public void start() {
+    if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+      initStandAloneConfigNode();
+    }
+  }
 
   @Override
   public void stop() {
@@ -242,17 +258,77 @@ public class PartitionRegionStateMachine
     return RetryPolicy.super.getSleepTime();
   }
 
+  /** TODO optimize the lock usage */
+  private synchronized void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
+    if (logFile.length() > LOG_FILE_MAX_SIZE) {
+      try {
+        logWriter.force();
+        File completedFilePath = new File(filePath + startIndex + "_" + endIndex);
+        Files.move(logFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE);
+      } catch (IOException e) {
+        LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus mode", e);
+      }
+      for (int retry = 0; retry < 5; retry++) {
+        try {
+          logWriter.close();
+        } catch (IOException e) {
+          LOGGER.warn(
+              "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, filePath: {}, retry: {}",
+              logFile.getAbsolutePath(),
+              retry);
+          try {
+            // Sleep 1s and retry
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException e2) {
+            Thread.currentThread().interrupt();
+            LOGGER.warn("Unexpected interruption during the close method of logWriter");
+          }
+          continue;
+        }
+        break;
+      }
+      startIndex = endIndex + 1;
+      createLogFile(startIndex);
+    }
+
+    try {
+      ByteBuffer buffer = plan.serializeToByteBuffer();
+      // The method logWriter.write will execute flip() firstly, so we must make position==limit
+      buffer.position(buffer.limit());
+      logWriter.write(buffer);
+      endIndex = endIndex + 1;
+      File tmpLogFile = new File(progressFilePath + endIndex);
+      Files.move(logFile.toPath(), tmpLogFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
+      logFile = tmpLogFile;
+      logWriter = new LogWriter(logFile, false);
+    } catch (Exception e) {
+      LOGGER.error(
+          "Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e);
+    }
+  }
+
   private void initStandAloneConfigNode() {
-    String[] list = new File(fileDir).list();
+    File dir = new File(currentFileDir);
+    dir.mkdirs();
+    String[] list = new File(currentFileDir).list();
     if (list != null && list.length != 0) {
       for (String logFileName : list) {
-        File logFile = SystemFileFactory.INSTANCE.getFile(fileDir + File.separator + logFileName);
+        int tmp = Integer.parseInt(logFileName.substring(logFileName.lastIndexOf("_") + 1));
+        if (logFileName.startsWith("log_inprogress")) {
+          endIndex = tmp;
+        } else {
+          if (startIndex < tmp) {
+            startIndex = tmp;
+          }
+        }
+        File logFile =
+            SystemFileFactory.INSTANCE.getFile(currentFileDir + File.separator + logFileName);
         SingleFileLogReader logReader;
         try {
           logReader = new SingleFileLogReader(logFile);
         } catch (FileNotFoundException e) {
           LOGGER.error(
-              "initStandAloneConfigNode meets error, can't find standalone log files, filePath: {}",
+              "InitStandAloneConfigNode meets error, can't find standalone log files, filePath: {}",
               logFile.getAbsolutePath(),
               e);
           continue;
@@ -268,31 +344,25 @@ public class PartitionRegionStateMachine
         }
         logReader.close();
       }
+    } else {
+      startIndex = 0;
+      endIndex = 0;
     }
-    for (int ID = 0; ID < Integer.MAX_VALUE; ID++) {
-      File file = SystemFileFactory.INSTANCE.getFile(filePath);
-      if (!file.exists()) {
-        logFileId = ID;
-        break;
-      }
-    }
-    createLogFile(logFileId);
+    startIndex = startIndex + 1;
+    createLogFile(endIndex);
   }
 
-  private void createLogFile(int logFileId) {
-    logFile = SystemFileFactory.INSTANCE.getFile(filePath + logFileId);
+  private void createLogFile(int endIndex) {
+    logFile = SystemFileFactory.INSTANCE.getFile(progressFilePath + endIndex);
     try {
-      if (logFile.createNewFile()) {
-        logWriter = new LogWriter(logFile, false);
-        LOGGER.info("Create StandaloneLog: {}", logFile.getAbsolutePath());
-      }
-    } catch (IOException e) {
-      LOGGER.warn("Can't create StandaloneLog: {}, retrying...", logFile.getAbsolutePath());
-      try {
-        TimeUnit.SECONDS.sleep(1);
-      } catch (InterruptedException ignored) {
-        // Ignore and retry
-      }
+      logFile.createNewFile();
+      logWriter = new LogWriter(logFile, false);
+      LOGGER.info("Create ConfigNode SimpleConsensusFile: {}", logFile.getAbsolutePath());
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Create ConfigNode SimpleConsensusFile failed, filePath: {}",
+          logFile.getAbsolutePath(),
+          e);
     }
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 3aa494e078..9e86e91825 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -179,7 +179,6 @@ public class ConsensusManager {
                               CONF.getConfigNodeConsensusProtocolClass())));
     }
     consensusImpl.start();
-
     if (SystemPropertiesUtils.isRestarted()) {
       try {
         // Create ConsensusGroup from confignode-system.properties file when restart