You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/10/31 06:31:39 UTC

[iotdb] 02/02: add init method for PartitionRegionStateMachine

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/master
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 067bafa820b9bedf42ccdea6b32c3597dfa33e81
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Mon Oct 31 14:31:23 2022 +0800

    add init method for PartitionRegionStateMachine
---
 .../statemachine/PartitionRegionStateMachine.java  | 48 +++++++++++++++++++++-
 1 file changed, 47 insertions(+), 1 deletion(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index f6b940d548..f7c9a5b2ad 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeExc
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
 import org.apache.iotdb.confignode.writelog.io.SingleFileLogReader;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -120,6 +122,46 @@ public class PartitionRegionStateMachine
       result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
 
+    if (ConsensusFactory.StandAloneConsensus.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+      if (logFile.length() > FILE_MAX_SIZE) {
+        try {
+          logWriter.force();
+        } catch (IOException e) {
+          LOGGER.error("Can't force logWrite for ConfigNode Standalone mode", e);
+        }
+        for (int retry = 0; retry < 5; retry++) {
+          try {
+            logWriter.close();
+          } catch (IOException e) {
+            LOGGER.warn(
+                "Can't close StandAloneLog for ConfigNode Standalone mode, filePath: {}, retry: {}",
+                logFile.getAbsolutePath(),
+                retry);
+            try {
+              // Sleep 1s and retry
+              TimeUnit.SECONDS.sleep(1);
+            } catch (InterruptedException e2) {
+              Thread.currentThread().interrupt();
+              LOGGER.warn("Unexpected interruption during the close method of logWriter");
+            }
+            continue;
+          }
+          break;
+        }
+        createLogFile(logFileId + 1);
+      }
+
+      try {
+        ByteBuffer buffer = plan.serializeToByteBuffer();
+        // The method logWriter.write will execute flip() firstly, so we must make position==limit
+        buffer.position(buffer.limit());
+        logWriter.write(buffer);
+      } catch (IOException e) {
+        LOGGER.error(
+            "can't serialize current ConfigPhysicalPlan for ConfigNode Standalone mode", e);
+      }
+    }
+
     return result;
   }
 
@@ -212,7 +254,11 @@ public class PartitionRegionStateMachine
   }
 
   @Override
-  public void start() {}
+  public void start() {
+    if (ConsensusFactory.StandAloneConsensus.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+      initStandAloneConfigNode();
+    }
+  }
 
   @Override
   public void stop() {