You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/08/16 12:11:34 UTC

[iotdb] branch consensus_module_refactor updated (9ca59ee3b7d -> 3b82225a2d1)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch consensus_module_refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git


 discard 9ca59ee3b7d refactor data/schema region singleton
     new 3b82225a2d1 refactor data/schema region singleton

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (9ca59ee3b7d)
            \
             N -- N -- N   refs/heads/consensus_module_refactor (3b82225a2d1)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../impl/DataNodeInternalRPCServiceImpl.java       | 57 ++++++++++------------
 .../java/org/apache/iotdb/db/service/DataNode.java | 38 +++++++++------
 .../apache/iotdb/db/service/IoTDBShutdownHook.java | 11 ++---
 3 files changed, 54 insertions(+), 52 deletions(-)


[iotdb] 01/01: refactor data/schema region singleton

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch consensus_module_refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3b82225a2d1b46f485687311c7649508357142b0
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Wed Aug 16 20:06:03 2023 +0800

    refactor data/schema region singleton
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../db/consensus/DataRegionConsensusImpl.java      | 284 ++++++++++-----------
 .../db/consensus/SchemaRegionConsensusImpl.java    | 201 +++++++--------
 .../impl/DataNodeInternalRPCServiceImpl.java       |  57 ++---
 .../java/org/apache/iotdb/db/service/DataNode.java |  38 +--
 .../apache/iotdb/db/service/IoTDBShutdownHook.java |  11 +-
 5 files changed, 288 insertions(+), 303 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 1e9ec1722d4..03963c8b359 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -48,162 +48,150 @@ import java.util.concurrent.TimeUnit;
  */
 public class DataRegionConsensusImpl {
 
-  private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
-
-  private static IConsensus INSTANCE = null;
-
   private DataRegionConsensusImpl() {
     // do nothing
   }
 
-  // need to create instance before calling this method
   public static IConsensus getInstance() {
-    return INSTANCE;
+    return DataRegionConsensusImplHolder.INSTANCE;
   }
 
-  public static synchronized IConsensus setupAndGetInstance() {
-    if (INSTANCE == null) {
-      INSTANCE =
-          ConsensusFactory.getConsensusImpl(
-                  conf.getDataRegionConsensusProtocolClass(),
-                  ConsensusConfig.newBuilder()
-                      .setThisNodeId(conf.getDataNodeId())
-                      .setThisNode(
-                          new TEndPoint(
-                              conf.getInternalAddress(), conf.getDataRegionConsensusPort()))
-                      .setStorageDir(conf.getDataRegionConsensusDir())
-                      .setConsensusGroupType(TConsensusGroupType.DataRegion)
-                      .setIoTConsensusConfig(
-                          IoTConsensusConfig.newBuilder()
-                              .setRpc(
-                                  RPC.newBuilder()
-                                      .setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
-                                      .setRpcSelectorThreadNum(conf.getRpcSelectorThreadCount())
-                                      .setRpcMinConcurrentClientNum(
-                                          conf.getRpcMinConcurrentClientNum())
-                                      .setRpcMaxConcurrentClientNum(
-                                          conf.getRpcMaxConcurrentClientNum())
-                                      .setRpcThriftCompressionEnabled(
-                                          conf.isRpcThriftCompressionEnable())
-                                      .setSelectorNumOfClientManager(
-                                          conf.getSelectorNumOfClientManager())
-                                      .setThriftServerAwaitTimeForStopService(
-                                          conf.getThriftServerAwaitTimeForStopService())
-                                      .setThriftMaxFrameSize(conf.getThriftMaxFrameSize())
-                                      .setCoreClientNumForEachNode(
-                                          conf.getCoreClientNumForEachNode())
-                                      .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
-                                      .build())
-                              .setReplication(
-                                  IoTConsensusConfig.Replication.newBuilder()
-                                      .setWalThrottleThreshold(conf.getThrottleThreshold())
-                                      .setAllocateMemoryForConsensus(
-                                          conf.getAllocateMemoryForConsensus())
-                                      .setMaxLogEntriesNumPerBatch(
-                                          conf.getMaxLogEntriesNumPerBatch())
-                                      .setMaxSizePerBatch(conf.getMaxSizePerBatch())
-                                      .setMaxPendingBatchesNum(conf.getMaxPendingBatchesNum())
-                                      .setMaxMemoryRatioForQueue(conf.getMaxMemoryRatioForQueue())
-                                      .build())
-                              .build())
-                      .setRatisConfig(
-                          RatisConfig.newBuilder()
-                              // An empty log is committed after each restart, even if no data is
-                              // written. This setting ensures that compaction work is not discarded
-                              // even if there are frequent restarts
-                              .setSnapshot(
-                                  Snapshot.newBuilder()
-                                      .setCreationGap(1)
-                                      .setAutoTriggerThreshold(
-                                          conf.getDataRatisConsensusSnapshotTriggerThreshold())
-                                      .build())
-                              .setLog(
-                                  RatisConfig.Log.newBuilder()
-                                      .setUnsafeFlushEnabled(
-                                          conf.isDataRatisConsensusLogUnsafeFlushEnable())
-                                      .setForceSyncNum(conf.getDataRatisConsensusLogForceSyncNum())
-                                      .setSegmentSizeMax(
-                                          SizeInBytes.valueOf(
-                                              conf.getDataRatisConsensusLogSegmentSizeMax()))
-                                      .setPreserveNumsWhenPurge(
-                                          conf.getDataRatisConsensusPreserveWhenPurge())
-                                      .build())
-                              .setGrpc(
-                                  RatisConfig.Grpc.newBuilder()
-                                      .setFlowControlWindow(
-                                          SizeInBytes.valueOf(
-                                              conf.getDataRatisConsensusGrpcFlowControlWindow()))
-                                      .setLeaderOutstandingAppendsMax(
-                                          conf
-                                              .getDataRatisConsensusGrpcLeaderOutstandingAppendsMax())
-                                      .build())
-                              .setRpc(
-                                  RatisConfig.Rpc.newBuilder()
-                                      .setTimeoutMin(
-                                          TimeDuration.valueOf(
-                                              conf
-                                                  .getDataRatisConsensusLeaderElectionTimeoutMinMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setTimeoutMax(
-                                          TimeDuration.valueOf(
-                                              conf
-                                                  .getDataRatisConsensusLeaderElectionTimeoutMaxMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setRequestTimeout(
-                                          TimeDuration.valueOf(
-                                              conf.getDataRatisConsensusRequestTimeoutMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setFirstElectionTimeoutMin(
-                                          TimeDuration.valueOf(
-                                              conf.getRatisFirstElectionTimeoutMinMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setFirstElectionTimeoutMax(
-                                          TimeDuration.valueOf(
-                                              conf.getRatisFirstElectionTimeoutMaxMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .build())
-                              .setClient(
-                                  RatisConfig.Client.newBuilder()
-                                      .setClientRequestTimeoutMillis(
-                                          conf.getDataRatisConsensusRequestTimeoutMs())
-                                      .setClientMaxRetryAttempt(
-                                          conf.getDataRatisConsensusMaxRetryAttempts())
-                                      .setClientRetryInitialSleepTimeMs(
-                                          conf.getDataRatisConsensusInitialSleepTimeMs())
-                                      .setClientRetryMaxSleepTimeMs(
-                                          conf.getDataRatisConsensusMaxSleepTimeMs())
-                                      .setCoreClientNumForEachNode(
-                                          conf.getCoreClientNumForEachNode())
-                                      .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
-                                      .build())
-                              .setImpl(
-                                  RatisConfig.Impl.newBuilder()
-                                      .setTriggerSnapshotFileSize(conf.getDataRatisLogMax())
-                                      .build())
-                              .setLeaderLogAppender(
-                                  RatisConfig.LeaderLogAppender.newBuilder()
-                                      .setBufferByteLimit(
-                                          conf.getDataRatisConsensusLogAppenderBufferSizeMax())
-                                      .build())
-                              .build())
-                      .build(),
-                  DataRegionConsensusImpl::createDataRegionStateMachine)
-              .orElseThrow(
-                  () ->
-                      new IllegalArgumentException(
-                          String.format(
-                              ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                              conf.getDataRegionConsensusProtocolClass())));
-    }
-    return INSTANCE;
-  }
+  private static class DataRegionConsensusImplHolder {
+
+    private static final IoTDBConfig CONF = IoTDBDescriptor.getInstance().getConfig();
+
+    private static final IConsensus INSTANCE =
+        ConsensusFactory.getConsensusImpl(
+                CONF.getDataRegionConsensusProtocolClass(),
+                ConsensusConfig.newBuilder()
+                    .setThisNodeId(CONF.getDataNodeId())
+                    .setThisNode(
+                        new TEndPoint(CONF.getInternalAddress(), CONF.getDataRegionConsensusPort()))
+                    .setStorageDir(CONF.getDataRegionConsensusDir())
+                    .setConsensusGroupType(TConsensusGroupType.DataRegion)
+                    .setIoTConsensusConfig(
+                        IoTConsensusConfig.newBuilder()
+                            .setRpc(
+                                RPC.newBuilder()
+                                    .setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
+                                    .setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
+                                    .setRpcMinConcurrentClientNum(
+                                        CONF.getRpcMinConcurrentClientNum())
+                                    .setRpcMaxConcurrentClientNum(
+                                        CONF.getRpcMaxConcurrentClientNum())
+                                    .setRpcThriftCompressionEnabled(
+                                        CONF.isRpcThriftCompressionEnable())
+                                    .setSelectorNumOfClientManager(
+                                        CONF.getSelectorNumOfClientManager())
+                                    .setThriftServerAwaitTimeForStopService(
+                                        CONF.getThriftServerAwaitTimeForStopService())
+                                    .setThriftMaxFrameSize(CONF.getThriftMaxFrameSize())
+                                    .setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
+                                    .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
+                                    .build())
+                            .setReplication(
+                                IoTConsensusConfig.Replication.newBuilder()
+                                    .setWalThrottleThreshold(CONF.getThrottleThreshold())
+                                    .setAllocateMemoryForConsensus(
+                                        CONF.getAllocateMemoryForConsensus())
+                                    .setMaxLogEntriesNumPerBatch(CONF.getMaxLogEntriesNumPerBatch())
+                                    .setMaxSizePerBatch(CONF.getMaxSizePerBatch())
+                                    .setMaxPendingBatchesNum(CONF.getMaxPendingBatchesNum())
+                                    .setMaxMemoryRatioForQueue(CONF.getMaxMemoryRatioForQueue())
+                                    .build())
+                            .build())
+                    .setRatisConfig(
+                        RatisConfig.newBuilder()
+                            // An empty log is committed after each restart, even if no data is
+                            // written. This setting ensures that compaction work is not discarded
+                            // even if there are frequent restarts
+                            .setSnapshot(
+                                Snapshot.newBuilder()
+                                    .setCreationGap(1)
+                                    .setAutoTriggerThreshold(
+                                        CONF.getDataRatisConsensusSnapshotTriggerThreshold())
+                                    .build())
+                            .setLog(
+                                RatisConfig.Log.newBuilder()
+                                    .setUnsafeFlushEnabled(
+                                        CONF.isDataRatisConsensusLogUnsafeFlushEnable())
+                                    .setForceSyncNum(CONF.getDataRatisConsensusLogForceSyncNum())
+                                    .setSegmentSizeMax(
+                                        SizeInBytes.valueOf(
+                                            CONF.getDataRatisConsensusLogSegmentSizeMax()))
+                                    .setPreserveNumsWhenPurge(
+                                        CONF.getDataRatisConsensusPreserveWhenPurge())
+                                    .build())
+                            .setGrpc(
+                                RatisConfig.Grpc.newBuilder()
+                                    .setFlowControlWindow(
+                                        SizeInBytes.valueOf(
+                                            CONF.getDataRatisConsensusGrpcFlowControlWindow()))
+                                    .setLeaderOutstandingAppendsMax(
+                                        CONF.getDataRatisConsensusGrpcLeaderOutstandingAppendsMax())
+                                    .build())
+                            .setRpc(
+                                RatisConfig.Rpc.newBuilder()
+                                    .setTimeoutMin(
+                                        TimeDuration.valueOf(
+                                            CONF.getDataRatisConsensusLeaderElectionTimeoutMinMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setTimeoutMax(
+                                        TimeDuration.valueOf(
+                                            CONF.getDataRatisConsensusLeaderElectionTimeoutMaxMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setRequestTimeout(
+                                        TimeDuration.valueOf(
+                                            CONF.getDataRatisConsensusRequestTimeoutMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setFirstElectionTimeoutMin(
+                                        TimeDuration.valueOf(
+                                            CONF.getRatisFirstElectionTimeoutMinMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setFirstElectionTimeoutMax(
+                                        TimeDuration.valueOf(
+                                            CONF.getRatisFirstElectionTimeoutMaxMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .build())
+                            .setClient(
+                                RatisConfig.Client.newBuilder()
+                                    .setClientRequestTimeoutMillis(
+                                        CONF.getDataRatisConsensusRequestTimeoutMs())
+                                    .setClientMaxRetryAttempt(
+                                        CONF.getDataRatisConsensusMaxRetryAttempts())
+                                    .setClientRetryInitialSleepTimeMs(
+                                        CONF.getDataRatisConsensusInitialSleepTimeMs())
+                                    .setClientRetryMaxSleepTimeMs(
+                                        CONF.getDataRatisConsensusMaxSleepTimeMs())
+                                    .setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
+                                    .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
+                                    .build())
+                            .setImpl(
+                                RatisConfig.Impl.newBuilder()
+                                    .setTriggerSnapshotFileSize(CONF.getDataRatisLogMax())
+                                    .build())
+                            .setLeaderLogAppender(
+                                RatisConfig.LeaderLogAppender.newBuilder()
+                                    .setBufferByteLimit(
+                                        CONF.getDataRatisConsensusLogAppenderBufferSizeMax())
+                                    .build())
+                            .build())
+                    .build(),
+                DataRegionConsensusImplHolder::createDataRegionStateMachine)
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            CONF.getDataRegionConsensusProtocolClass())));
 
-  private static DataRegionStateMachine createDataRegionStateMachine(ConsensusGroupId gid) {
-    DataRegion dataRegion = StorageEngine.getInstance().getDataRegion((DataRegionId) gid);
-    if (ConsensusFactory.IOT_CONSENSUS.equals(conf.getDataRegionConsensusProtocolClass())) {
-      return new IoTConsensusDataRegionStateMachine(dataRegion);
-    } else {
-      return new DataRegionStateMachine(dataRegion);
+    private static DataRegionStateMachine createDataRegionStateMachine(ConsensusGroupId gid) {
+      DataRegion dataRegion = StorageEngine.getInstance().getDataRegion((DataRegionId) gid);
+      if (ConsensusFactory.IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
+        return new IoTConsensusDataRegionStateMachine(dataRegion);
+      } else {
+        return new DataRegionStateMachine(dataRegion);
+      }
     }
   }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index a38b9dde4bc..0e39d28c66b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -42,112 +42,107 @@ import java.util.concurrent.TimeUnit;
  */
 public class SchemaRegionConsensusImpl {
 
-  private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
-
-  private static IConsensus INSTANCE = null;
-
-  private SchemaRegionConsensusImpl() {}
+  private SchemaRegionConsensusImpl() {
+    // do nothing
+  }
 
-  // need to create instance before calling this method
   public static IConsensus getInstance() {
-    return INSTANCE;
+    return SchemaRegionConsensusImplHolder.INSTANCE;
   }
 
-  public static synchronized IConsensus setupAndGetInstance() {
-    if (INSTANCE == null) {
-      INSTANCE =
-          ConsensusFactory.getConsensusImpl(
-                  conf.getSchemaRegionConsensusProtocolClass(),
-                  ConsensusConfig.newBuilder()
-                      .setThisNodeId(conf.getDataNodeId())
-                      .setThisNode(
-                          new TEndPoint(
-                              conf.getInternalAddress(), conf.getSchemaRegionConsensusPort()))
-                      .setConsensusGroupType(TConsensusGroupType.SchemaRegion)
-                      .setRatisConfig(
-                          RatisConfig.newBuilder()
-                              .setSnapshot(
-                                  RatisConfig.Snapshot.newBuilder()
-                                      .setAutoTriggerThreshold(
-                                          conf.getSchemaRatisConsensusSnapshotTriggerThreshold())
-                                      .build())
-                              .setLog(
-                                  RatisConfig.Log.newBuilder()
-                                      .setUnsafeFlushEnabled(
-                                          conf.isSchemaRatisConsensusLogUnsafeFlushEnable())
-                                      .setSegmentSizeMax(
-                                          SizeInBytes.valueOf(
-                                              conf.getSchemaRatisConsensusLogSegmentSizeMax()))
-                                      .setPreserveNumsWhenPurge(
-                                          conf.getSchemaRatisConsensusPreserveWhenPurge())
-                                      .build())
-                              .setGrpc(
-                                  RatisConfig.Grpc.newBuilder()
-                                      .setFlowControlWindow(
-                                          SizeInBytes.valueOf(
-                                              conf.getSchemaRatisConsensusGrpcFlowControlWindow()))
-                                      .build())
-                              .setRpc(
-                                  RatisConfig.Rpc.newBuilder()
-                                      .setTimeoutMin(
-                                          TimeDuration.valueOf(
-                                              conf
-                                                  .getSchemaRatisConsensusLeaderElectionTimeoutMinMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setTimeoutMax(
-                                          TimeDuration.valueOf(
-                                              conf
-                                                  .getSchemaRatisConsensusLeaderElectionTimeoutMaxMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setRequestTimeout(
-                                          TimeDuration.valueOf(
-                                              conf.getSchemaRatisConsensusRequestTimeoutMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setFirstElectionTimeoutMin(
-                                          TimeDuration.valueOf(
-                                              conf.getRatisFirstElectionTimeoutMinMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .setFirstElectionTimeoutMax(
-                                          TimeDuration.valueOf(
-                                              conf.getRatisFirstElectionTimeoutMaxMs(),
-                                              TimeUnit.MILLISECONDS))
-                                      .build())
-                              .setClient(
-                                  RatisConfig.Client.newBuilder()
-                                      .setClientRequestTimeoutMillis(
-                                          conf.getDataRatisConsensusRequestTimeoutMs())
-                                      .setClientMaxRetryAttempt(
-                                          conf.getDataRatisConsensusMaxRetryAttempts())
-                                      .setClientRetryInitialSleepTimeMs(
-                                          conf.getDataRatisConsensusInitialSleepTimeMs())
-                                      .setClientRetryMaxSleepTimeMs(
-                                          conf.getDataRatisConsensusMaxSleepTimeMs())
-                                      .setCoreClientNumForEachNode(
-                                          conf.getCoreClientNumForEachNode())
-                                      .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
-                                      .build())
-                              .setImpl(
-                                  RatisConfig.Impl.newBuilder()
-                                      .setTriggerSnapshotFileSize(conf.getSchemaRatisLogMax())
-                                      .build())
-                              .setLeaderLogAppender(
-                                  RatisConfig.LeaderLogAppender.newBuilder()
-                                      .setBufferByteLimit(
-                                          conf.getSchemaRatisConsensusLogAppenderBufferSizeMax())
-                                      .build())
-                              .build())
-                      .setStorageDir(conf.getSchemaRegionConsensusDir())
-                      .build(),
-                  gid ->
-                      new SchemaRegionStateMachine(
-                          SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
-              .orElseThrow(
-                  () ->
-                      new IllegalArgumentException(
-                          String.format(
-                              ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                              conf.getSchemaRegionConsensusProtocolClass())));
-    }
-    return INSTANCE;
+  private static class SchemaRegionConsensusImplHolder {
+
+    private static final IoTDBConfig CONF = IoTDBDescriptor.getInstance().getConfig();
+    private static final IConsensus INSTANCE =
+        ConsensusFactory.getConsensusImpl(
+                CONF.getSchemaRegionConsensusProtocolClass(),
+                ConsensusConfig.newBuilder()
+                    .setThisNodeId(CONF.getDataNodeId())
+                    .setThisNode(
+                        new TEndPoint(
+                            CONF.getInternalAddress(), CONF.getSchemaRegionConsensusPort()))
+                    .setConsensusGroupType(TConsensusGroupType.SchemaRegion)
+                    .setRatisConfig(
+                        RatisConfig.newBuilder()
+                            .setSnapshot(
+                                RatisConfig.Snapshot.newBuilder()
+                                    .setAutoTriggerThreshold(
+                                        CONF.getSchemaRatisConsensusSnapshotTriggerThreshold())
+                                    .build())
+                            .setLog(
+                                RatisConfig.Log.newBuilder()
+                                    .setUnsafeFlushEnabled(
+                                        CONF.isSchemaRatisConsensusLogUnsafeFlushEnable())
+                                    .setSegmentSizeMax(
+                                        SizeInBytes.valueOf(
+                                            CONF.getSchemaRatisConsensusLogSegmentSizeMax()))
+                                    .setPreserveNumsWhenPurge(
+                                        CONF.getSchemaRatisConsensusPreserveWhenPurge())
+                                    .build())
+                            .setGrpc(
+                                RatisConfig.Grpc.newBuilder()
+                                    .setFlowControlWindow(
+                                        SizeInBytes.valueOf(
+                                            CONF.getSchemaRatisConsensusGrpcFlowControlWindow()))
+                                    .build())
+                            .setRpc(
+                                RatisConfig.Rpc.newBuilder()
+                                    .setTimeoutMin(
+                                        TimeDuration.valueOf(
+                                            CONF
+                                                .getSchemaRatisConsensusLeaderElectionTimeoutMinMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setTimeoutMax(
+                                        TimeDuration.valueOf(
+                                            CONF
+                                                .getSchemaRatisConsensusLeaderElectionTimeoutMaxMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setRequestTimeout(
+                                        TimeDuration.valueOf(
+                                            CONF.getSchemaRatisConsensusRequestTimeoutMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setFirstElectionTimeoutMin(
+                                        TimeDuration.valueOf(
+                                            CONF.getRatisFirstElectionTimeoutMinMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .setFirstElectionTimeoutMax(
+                                        TimeDuration.valueOf(
+                                            CONF.getRatisFirstElectionTimeoutMaxMs(),
+                                            TimeUnit.MILLISECONDS))
+                                    .build())
+                            .setClient(
+                                RatisConfig.Client.newBuilder()
+                                    .setClientRequestTimeoutMillis(
+                                        CONF.getDataRatisConsensusRequestTimeoutMs())
+                                    .setClientMaxRetryAttempt(
+                                        CONF.getDataRatisConsensusMaxRetryAttempts())
+                                    .setClientRetryInitialSleepTimeMs(
+                                        CONF.getDataRatisConsensusInitialSleepTimeMs())
+                                    .setClientRetryMaxSleepTimeMs(
+                                        CONF.getDataRatisConsensusMaxSleepTimeMs())
+                                    .setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
+                                    .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
+                                    .build())
+                            .setImpl(
+                                RatisConfig.Impl.newBuilder()
+                                    .setTriggerSnapshotFileSize(CONF.getSchemaRatisLogMax())
+                                    .build())
+                            .setLeaderLogAppender(
+                                RatisConfig.LeaderLogAppender.newBuilder()
+                                    .setBufferByteLimit(
+                                        CONF.getSchemaRatisConsensusLogAppenderBufferSizeMax())
+                                    .build())
+                            .build())
+                    .setStorageDir(CONF.getSchemaRegionConsensusDir())
+                    .build(),
+                gid ->
+                    new SchemaRegionStateMachine(
+                        SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) gid)))
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            CONF.getSchemaRegionConsensusProtocolClass())));
   }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index ff606c63a32..8c2d1be433c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -944,7 +944,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
 
       return exceptionMessages.isEmpty()
           ? new TPushPipeMetaResp()
-              .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
+          .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
           : new TPushPipeMetaResp()
               .setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
               .setExceptionMessages(exceptionMessages);
@@ -1195,25 +1195,22 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
 
   private Map<TConsensusGroupId, Boolean> getJudgedLeaders() {
     Map<TConsensusGroupId, Boolean> result = new HashMap<>();
-    if (DataRegionConsensusImpl.getInstance() != null) {
-      DataRegionConsensusImpl.getInstance()
-          .getAllConsensusGroupIds()
-          .forEach(
-              groupId ->
-                  result.put(
-                      groupId.convertToTConsensusGroupId(),
-                      DataRegionConsensusImpl.getInstance().isLeader(groupId)));
-    }
-
-    if (SchemaRegionConsensusImpl.getInstance() != null) {
-      SchemaRegionConsensusImpl.getInstance()
-          .getAllConsensusGroupIds()
-          .forEach(
-              groupId ->
-                  result.put(
-                      groupId.convertToTConsensusGroupId(),
-                      SchemaRegionConsensusImpl.getInstance().isLeader(groupId)));
-    }
+    DataRegionConsensusImpl.getInstance()
+        .getAllConsensusGroupIds()
+        .forEach(
+            groupId ->
+                result.put(
+                    groupId.convertToTConsensusGroupId(),
+                    DataRegionConsensusImpl.getInstance().isLeader(groupId)));
+
+    SchemaRegionConsensusImpl.getInstance()
+        .getAllConsensusGroupIds()
+        .forEach(
+            groupId ->
+                result.put(
+                    groupId.convertToTConsensusGroupId(),
+                    SchemaRegionConsensusImpl.getInstance().isLeader(groupId)));
+
     return result;
   }
 
@@ -1805,16 +1802,16 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     // kill the datanode process 20 seconds later
     // because datanode process cannot exit normally for the reason of InterruptedException
     new Thread(
-            () -> {
-              try {
-                TimeUnit.SECONDS.sleep(20);
-              } catch (InterruptedException e) {
-                LOGGER.warn("Meets InterruptedException in stopDataNode RPC method");
-              } finally {
-                LOGGER.info("Executing system.exit(0) in stopDataNode RPC method after 20 seconds");
-                System.exit(0);
-              }
-            })
+        () -> {
+          try {
+            TimeUnit.SECONDS.sleep(20);
+          } catch (InterruptedException e) {
+            LOGGER.warn("Meets InterruptedException in stopDataNode RPC method");
+          } finally {
+            LOGGER.info("Executing system.exit(0) in stopDataNode RPC method after 20 seconds");
+            System.exit(0);
+          }
+        })
         .start();
 
     try {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 4e05677c840..d89e9bb0480 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -128,11 +128,15 @@ public class DataNode implements DataNodeMBean {
 
   private final TEndPoint thisNode = new TEndPoint();
 
-  /** Hold the information of trigger, udf...... */
+  /**
+   * Hold the information of trigger, udf......
+   */
   private final ResourcesInformationHolder resourcesInformationHolder =
       new ResourcesInformationHolder();
 
-  /** Responsible for keeping trigger information up to date. */
+  /**
+   * Responsible for keeping trigger information up to date.
+   */
   private final TriggerInformationUpdater triggerInformationUpdater =
       new TriggerInformationUpdater();
 
@@ -203,7 +207,9 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  /** Prepare cluster IoTDB-DataNode */
+  /**
+   * Prepare cluster IoTDB-DataNode
+   */
   private boolean prepareDataNode() throws StartupException, IOException {
     // Set cluster mode
     config.setClusterMode(true);
@@ -348,7 +354,7 @@ public class DataNode implements DataNodeMBean {
    * Register this DataNode into cluster.
    *
    * @throws StartupException if register failed.
-   * @throws IOException if serialize cluster name and dataNode Id failed.
+   * @throws IOException      if serialize cluster name and dataNode Id failed.
    */
   private void sendRegisterRequestToConfigNode() throws StartupException, IOException {
     logger.info("Sending register request to ConfigNode-leader...");
@@ -484,8 +490,8 @@ public class DataNode implements DataNodeMBean {
     logger.info("IoTDB DataNode has started.");
 
     try {
-      SchemaRegionConsensusImpl.setupAndGetInstance().start();
-      DataRegionConsensusImpl.setupAndGetInstance().start();
+      SchemaRegionConsensusImpl.getInstance().start();
+      DataRegionConsensusImpl.getInstance().start();
     } catch (IOException e) {
       throw new StartupException(e);
     }
@@ -552,7 +558,9 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(PipeAgent.runtime());
   }
 
-  /** Set up RPC and protocols after DataNode is available */
+  /**
+   * Set up RPC and protocols after DataNode is available
+   */
   private void setUpRPCService() throws StartupException {
     // Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling
     registerManager.register(DataNodeInternalRPCService.getInstance());
@@ -697,7 +705,9 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  /** Generate a list for UDFs that do not have jar on this node. */
+  /**
+   * Generate a list for UDFs that do not have jar on this node.
+   */
   private List<UDFInformation> getJarListForUDF() {
     List<UDFInformation> res = new ArrayList<>();
     for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
@@ -810,7 +820,9 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  /** Generate a list for triggers that do not have jar on this node. */
+  /**
+   * Generate a list for triggers that do not have jar on this node.
+   */
   private List<TriggerInformation> getJarListForTrigger() {
     List<TriggerInformation> res = new ArrayList<>();
     for (TriggerInformation triggerInformation :
@@ -872,12 +884,8 @@ public class DataNode implements DataNodeMBean {
 
     try {
       MetricService.getInstance().stop();
-      if (SchemaRegionConsensusImpl.getInstance() != null) {
-        SchemaRegionConsensusImpl.getInstance().stop();
-      }
-      if (DataRegionConsensusImpl.getInstance() != null) {
-        DataRegionConsensusImpl.getInstance().stop();
-      }
+      SchemaRegionConsensusImpl.getInstance().stop();
+      DataRegionConsensusImpl.getInstance().stop();
     } catch (Exception e) {
       logger.error("Stop data node error", e);
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index 4aac30f3e10..8cd6673f3c0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.service;
 
+import java.io.IOException;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -88,13 +89,9 @@ public class IoTDBShutdownHook extends Thread {
 
     // close consensusImpl
     try {
-      if (SchemaRegionConsensusImpl.getInstance() != null) {
-        SchemaRegionConsensusImpl.getInstance().stop();
-      }
-      if (DataRegionConsensusImpl.getInstance() != null) {
-        DataRegionConsensusImpl.getInstance().stop();
-      }
-    } catch (Exception e) {
+      SchemaRegionConsensusImpl.getInstance().stop();
+      DataRegionConsensusImpl.getInstance().stop();
+    } catch (IOException e) {
       logger.error("Stop ConsensusImpl error in IoTDBShutdownHook", e);
     }