You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/10/31 06:31:39 UTC
[iotdb] 02/02: add init method for PartitionRegionStateMachine
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 067bafa820b9bedf42ccdea6b32c3597dfa33e81
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Mon Oct 31 14:31:23 2022 +0800
add init method for PartitionRegionStateMachine
---
.../statemachine/PartitionRegionStateMachine.java | 48 +++++++++++++++++++++-
1 file changed, 47 insertions(+), 1 deletion(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index f6b940d548..f7c9a5b2ad 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeExc
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
import org.apache.iotdb.confignode.writelog.io.SingleFileLogReader;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -120,6 +122,46 @@ public class PartitionRegionStateMachine
result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
+ if (ConsensusFactory.StandAloneConsensus.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+ if (logFile.length() > FILE_MAX_SIZE) {
+ try {
+ logWriter.force();
+ } catch (IOException e) {
+ LOGGER.error("Can't force logWrite for ConfigNode Standalone mode", e);
+ }
+ for (int retry = 0; retry < 5; retry++) {
+ try {
+ logWriter.close();
+ } catch (IOException e) {
+ LOGGER.warn(
+ "Can't close StandAloneLog for ConfigNode Standalone mode, filePath: {}, retry: {}",
+ logFile.getAbsolutePath(),
+ retry);
+ try {
+ // Sleep 1s and retry
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Unexpected interruption during the close method of logWriter");
+ }
+ continue;
+ }
+ break;
+ }
+ createLogFile(logFileId + 1);
+ }
+
+ try {
+ ByteBuffer buffer = plan.serializeToByteBuffer();
+ // The method logWriter.write will execute flip() firstly, so we must make position==limit
+ buffer.position(buffer.limit());
+ logWriter.write(buffer);
+ } catch (IOException e) {
+ LOGGER.error(
+ "can't serialize current ConfigPhysicalPlan for ConfigNode Standalone mode", e);
+ }
+ }
+
return result;
}
@@ -212,7 +254,11 @@ public class PartitionRegionStateMachine
}
@Override
- public void start() {}
+ public void start() {
+ if (ConsensusFactory.StandAloneConsensus.equals(CONF.getConfigNodeConsensusProtocolClass())) {
+ initStandAloneConfigNode();
+ }
+ }
@Override
public void stop() {