You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/23 11:58:41 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13] Improve the process of secondary (#6855)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new cafc7f3dde [To rel/0.13] Improve the process of secondary (#6855)
cafc7f3dde is described below

commit cafc7f3dde0c61d1f5dff4d3df26ca7350db033a
Author: ljn55966005 <32...@users.noreply.github.com>
AuthorDate: Tue Aug 23 19:58:35 2022 +0800

    [To rel/0.13] Improve the process of secondary (#6855)
---
 client-cpp/pom.xml                                 |   4 +-
 compile-tools/pom.xml                              |   6 +-
 distribution/pom.xml                               |   2 +-
 example/client-cpp-example/pom.xml                 |   2 +-
 example/trigger/pom.xml                            |   2 +-
 example/udf/pom.xml                                |   2 +-
 grafana-connector/pom.xml                          |   2 +-
 integration/pom.xml                                |   6 +-
 jdbc/pom.xml                                       |   2 +-
 pom.xml                                            |   8 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  29 +++--
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   1 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 +-
 .../iotdb/db/doublelive/OperationSyncConsumer.java |  38 +++---
 .../db/doublelive/OperationSyncDDLProtector.java   |  35 +++---
 .../db/doublelive/OperationSyncDMLProtector.java   |  45 +++++++-
 .../db/doublelive/OperationSyncLogService.java     |  35 +++---
 .../db/doublelive/OperationSyncPlanTypeUtils.java  |  22 +++-
 .../iotdb/db/doublelive/OperationSyncProducer.java |  50 ++++++--
 .../db/doublelive/OperationSyncProtector.java      |   8 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 128 +++++++++++++++++----
 .../qp/physical/sys/CreateContinuousQueryPlan.java |  17 +++
 .../qp/physical/sys/DropContinuousQueryPlan.java   |   8 ++
 23 files changed, 328 insertions(+), 130 deletions(-)

diff --git a/client-cpp/pom.xml b/client-cpp/pom.xml
index b6a0014588..38888eb4ba 100644
--- a/client-cpp/pom.xml
+++ b/client-cpp/pom.xml
@@ -116,8 +116,8 @@
                 <cmake.root.dir>${project.parent.basedir}/compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
                 <thrift.exec.absolute.path>${project.parent.basedir}/compile-tools/thrift/target/build/compiler/cpp/bin/${cmake.build.type}/thrift.exe</thrift.exec.absolute.path>
                 <iotdb.server.script>start-server.bat</iotdb.server.script>
-                <boost.include.dir />
-                <boost.library.dir />
+                <boost.include.dir/>
+                <boost.library.dir/>
             </properties>
         </profile>
         <profile>
diff --git a/compile-tools/pom.xml b/compile-tools/pom.xml
index fc4d80f6b3..afab6ff8d9 100644
--- a/compile-tools/pom.xml
+++ b/compile-tools/pom.xml
@@ -35,7 +35,7 @@
         <cmake-version>3.17.3</cmake-version>
         <openssl.include.dir>-Dtrue1=true1</openssl.include.dir>
         <bison.executable.dir>-Dtrue1=true1</bison.executable.dir>
-        <cmake.build.type />
+        <cmake.build.type/>
     </properties>
     <modules>
         <module>thrift</module>
@@ -138,8 +138,8 @@
                 <thrift.make.executable>make</thrift.make.executable>
                 <thrift.compiler.executable>thrift.exe</thrift.compiler.executable>
                 <gradlew.executable>gradlew.bat</gradlew.executable>
-                <boost.include.dir />
-                <boost.library.dir />
+                <boost.include.dir/>
+                <boost.library.dir/>
             </properties>
         </profile>
     </profiles>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 70dd54169d..c1aaa127ef 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -29,7 +29,7 @@
     </parent>
     <artifactId>iotdb-distribution</artifactId>
     <name>IoTDB Distribution</name>
-    <modules />
+    <modules/>
     <build>
         <plugins>
             <plugin>
diff --git a/example/client-cpp-example/pom.xml b/example/client-cpp-example/pom.xml
index 703f8fe8ad..9a5705cb36 100644
--- a/example/client-cpp-example/pom.xml
+++ b/example/client-cpp-example/pom.xml
@@ -84,7 +84,7 @@
             <properties>
                 <cmake.generator>Visual Studio 16 2019</cmake.generator>
                 <cmake.root.dir>${project.parent.basedir}/../compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
-                <boost.include.dir />
+                <boost.include.dir/>
             </properties>
         </profile>
         <profile>
diff --git a/example/trigger/pom.xml b/example/trigger/pom.xml
index 64145c1682..1b16ca6041 100644
--- a/example/trigger/pom.xml
+++ b/example/trigger/pom.xml
@@ -118,7 +118,7 @@
                                 <importOrder>
                                     <order>org.apache.iotdb,,javax,java,\#</order>
                                 </importOrder>
-                                <removeUnusedImports />
+                                <removeUnusedImports/>
                             </java>
                         </configuration>
                         <executions>
diff --git a/example/udf/pom.xml b/example/udf/pom.xml
index f4d18b2510..4d26ff932f 100644
--- a/example/udf/pom.xml
+++ b/example/udf/pom.xml
@@ -118,7 +118,7 @@
                                 <importOrder>
                                     <order>org.apache.iotdb,,javax,java,\#</order>
                                 </importOrder>
-                                <removeUnusedImports />
+                                <removeUnusedImports/>
                             </java>
                         </configuration>
                         <executions>
diff --git a/grafana-connector/pom.xml b/grafana-connector/pom.xml
index 4a256ad502..91b7616d35 100644
--- a/grafana-connector/pom.xml
+++ b/grafana-connector/pom.xml
@@ -170,7 +170,7 @@
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                         <resource>META-INF/spring.schemas</resource>
                                     </transformer>
-                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                         <mainClass>${start-class}</mainClass>
                                     </transformer>
diff --git a/integration/pom.xml b/integration/pom.xml
index 90deb1433d..d8e1e66f00 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -80,7 +80,7 @@
             <id>LocalStandalone</id>
             <properties>
                 <test.includedGroups>org.apache.iotdb.itbase.category.LocalStandaloneTest</test.includedGroups>
-                <test.excludedGroups />
+                <test.excludedGroups/>
             </properties>
             <activation>
                 <activeByDefault>true</activeByDefault>
@@ -142,7 +142,7 @@
             <id>Remote</id>
             <properties>
                 <test.includedGroups>org.apache.iotdb.itbase.category.RemoteTest</test.includedGroups>
-                <test.excludedGroups />
+                <test.excludedGroups/>
             </properties>
             <activation>
                 <activeByDefault>false</activeByDefault>
@@ -206,7 +206,7 @@
             <id>Cluster</id>
             <properties>
                 <test.includedGroups>org.apache.iotdb.itbase.category.ClusterTest</test.includedGroups>
-                <test.excludedGroups />
+                <test.excludedGroups/>
             </properties>
             <activation>
                 <activeByDefault>false</activeByDefault>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 12c6bd5dd4..5ad6473ddb 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -203,7 +203,7 @@
                                                 </goals>
                                             </pluginExecutionFilter>
                                             <action>
-                                                <ignore />
+                                                <ignore/>
                                             </action>
                                         </pluginExecution>
                                     </pluginExecutions>
diff --git a/pom.xml b/pom.xml
index 0930fdcd7a..371b4c81cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@
         <sonar.junit.reportPaths>target/surefire-reports,target/failsafe-reports</sonar.junit.reportPaths>
         <!-- By default, the argLine is empty-->
         <gson.version>2.8.8</gson.version>
-        <argLine />
+        <argLine/>
         <!-- whether enable compiling the cpp client-->
         <client-cpp>false</client-cpp>
         <!-- disable enforcer by default-->
@@ -693,7 +693,7 @@
                             <importOrder>
                                 <order>org.apache.iotdb,,javax,java,\#</order>
                             </importOrder>
-                            <removeUnusedImports />
+                            <removeUnusedImports/>
                         </java>
                         <lineEndings>UNIX</lineEndings>
                     </configuration>
@@ -768,7 +768,7 @@
                         <phase>validate</phase>
                         <configuration>
                             <rules>
-                                <dependencyConvergence />
+                                <dependencyConvergence/>
                             </rules>
                         </configuration>
                         <goals>
@@ -814,7 +814,7 @@
                                 </requireJavaVersion>
                                 <!-- Disabled for now as it breaks the ability to build single modules -->
                                 <!--reactorModuleConvergence/-->
-                                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+                                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
                             </rules>
                         </configuration>
                     </execution>
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index adff76ea4e..997ea249d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -830,8 +830,10 @@ public class IoTDBConfig {
   private String secondaryUser = "root";
   private String secondaryPassword = "root";
 
+  private int secondarySessionPoolMaxSize = 10;
+
   // The transmitting concurrency size of operation sync SessionPool
-  private int OperationSyncSessionConcurrencySize = 8;
+  private int operationSyncSessionConcurrencySize = 8;
 
   // OperationSyncLog dir
   private String operationSyncLogDir =
@@ -845,8 +847,9 @@ public class IoTDBConfig {
 
   // OperationSyncProducer DML cache size
   private int operationSyncProducerCacheSize = 1024;
-  // OperationSyncConsumer concurrency size
-  private int operationSyncConsumerConcurrencySize = 4;
+
+  // OperationSyncProducer DML cache number
+  private int operationSyncProducerCacheNum = 8;
 
   // The max record num returned in one schema query.
   private int schemaQueryFetchSize = 10000000;
@@ -2662,12 +2665,20 @@ public class IoTDBConfig {
     this.secondaryPassword = secondaryPassword;
   }
 
+  public int getSecondarySessionPoolMaxSize() {
+    return secondarySessionPoolMaxSize;
+  }
+
+  public void setSecondarySessionPoolMaxSize(int secondarySessionPoolMaxSize) {
+    this.secondarySessionPoolMaxSize = secondarySessionPoolMaxSize;
+  }
+
   public int getOperationSyncSessionConcurrencySize() {
-    return OperationSyncSessionConcurrencySize;
+    return operationSyncSessionConcurrencySize;
   }
 
   public void setOperationSyncSessionConcurrencySize(int operationSyncSessionConcurrencySize) {
-    this.OperationSyncSessionConcurrencySize = operationSyncSessionConcurrencySize;
+    this.operationSyncSessionConcurrencySize = operationSyncSessionConcurrencySize;
   }
 
   public String getOperationSyncLogDir() {
@@ -2710,12 +2721,12 @@ public class IoTDBConfig {
     this.operationSyncProducerCacheSize = operationSyncProducerCacheSize;
   }
 
-  public int getOperationSyncConsumerConcurrencySize() {
-    return operationSyncConsumerConcurrencySize;
+  public int getOperationSyncProducerCacheNum() {
+    return operationSyncProducerCacheNum;
   }
 
-  public void setOperationSyncConsumerConcurrencySize(int operationSyncConsumerConcurrencySize) {
-    this.operationSyncConsumerConcurrencySize = operationSyncConsumerConcurrencySize;
+  public void setOperationSyncProducerCacheNum(int operationSyncProducerCacheNum) {
+    this.operationSyncProducerCacheNum = operationSyncProducerCacheNum;
   }
 
   public int getSchemaQueryFetchSize() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 21228f3cd7..1294002707 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -39,6 +39,7 @@ public class IoTDBConstant {
   public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER";
   public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL";
   public static final String COMPACTION_LOGGER_NAME = "COMPACTION";
+  public static final String DOUBLE_LIVE_LOGGER_NAME = "DOUBLE_LIVE";
 
   public static final String IOTDB_JMX_PORT = "iotdb.jmx.port";
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c277e9bc6a..46789e8aa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -839,11 +839,11 @@ public class IoTDBDescriptor {
                   "operation_sync_producer_cache_size",
                   String.valueOf(conf.getOperationSyncProducerCacheSize()))));
 
-      conf.setOperationSyncConsumerConcurrencySize(
+      conf.setOperationSyncProducerCacheNum(
           Integer.parseInt(
               properties.getProperty(
-                  "operation_sync_consumer_concurrency_size",
-                  String.valueOf(conf.getOperationSyncConsumerConcurrencySize()))));
+                  "operation_sync_producer_cache_num",
+                  String.valueOf(conf.getOperationSyncProducerCacheNum()))));
 
       conf.setSchemaQueryFetchSize(
           Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
index 3f05766d53..cdb23d24eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.doublelive;
 
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -33,16 +34,16 @@ public class OperationSyncConsumer implements Runnable {
   private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class);
 
   private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
-      OperationSyncQueue;
+      operationSyncQueue;
   private final SessionPool operationSyncSessionPool;
   private final OperationSyncLogService dmlLogService;
 
   public OperationSyncConsumer(
       BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
-          OperationSyncQueue,
+          operationSyncQueue,
       SessionPool operationSyncSessionPool,
       OperationSyncLogService dmlLogService) {
-    this.OperationSyncQueue = OperationSyncQueue;
+    this.operationSyncQueue = operationSyncQueue;
     this.operationSyncSessionPool = operationSyncSessionPool;
     this.dmlLogService = dmlLogService;
   }
@@ -52,31 +53,28 @@ public class OperationSyncConsumer implements Runnable {
     while (true) {
       Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> head;
       ByteBuffer headBuffer;
-      OperationSyncPlanTypeUtils.OperationSyncPlanType headType;
       try {
-        head = OperationSyncQueue.take();
+        head = operationSyncQueue.take();
         headBuffer = head.left;
-        headType = head.right;
       } catch (InterruptedException e) {
         LOGGER.error("OperationSyncConsumer been interrupted: ", e);
         continue;
       }
-
-      headBuffer.position(0);
       boolean transmitStatus = false;
-      try {
-        headBuffer.position(0);
-        transmitStatus = operationSyncSessionPool.operationSyncTransmit(headBuffer);
-      } catch (IoTDBConnectionException connectionException) {
-        // warn IoTDBConnectionException and do serialization
-        LOGGER.warn(
-            "OperationSyncConsumer can't transmit because network failure", connectionException);
-      } catch (Exception e) {
-        // The PhysicalPlan has internal error, reject transmit
-        LOGGER.error("OperationSyncConsumer can't transmit", e);
-        continue;
+      if (StorageEngine.isSecondaryAlive().get()) {
+        try {
+          headBuffer.position(0);
+          transmitStatus = operationSyncSessionPool.operationSyncTransmit(headBuffer);
+        } catch (IoTDBConnectionException connectionException) {
+          // warn IoTDBConnectionException and do serialization
+          LOGGER.warn(
+              "OperationSyncConsumer can't transmit because network failure", connectionException);
+        } catch (Exception e) {
+          // The PhysicalPlan has internal error, reject transmit
+          LOGGER.error("OperationSyncConsumer can't transmit", e);
+          continue;
+        }
       }
-
       if (!transmitStatus) {
         try {
           // must set buffer position to limit() before serialization
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
index b2e70c5042..1870145980 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.doublelive;
 
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.session.pool.SessionPool;
@@ -30,7 +31,7 @@ import java.util.concurrent.TimeUnit;
 
 public class OperationSyncDDLProtector extends OperationSyncProtector {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);;
 
   private final SessionPool operationSyncSessionPool;
 
@@ -49,20 +50,26 @@ public class OperationSyncDDLProtector extends OperationSyncProtector {
     while (true) {
       // transmit E-Plan until it's been received
       boolean transmitStatus = false;
-
-      try {
-        // try operation sync
-        planBuffer.position(0);
-        transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
-      } catch (IoTDBConnectionException connectionException) {
-        // warn IoTDBConnectionException and retry
-        LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
-      } catch (Exception e) {
-        // error exception and break
-        LOGGER.error("OperationSyncDDLProtector can't transmit", e);
-        break;
+      if (StorageEngine.isSecondaryAlive().get()) {
+        try {
+          // try operation sync
+          planBuffer.position(0);
+          transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
+        } catch (IoTDBConnectionException connectionException) {
+          // warn IoTDBConnectionException and retry
+          LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
+        } catch (Exception e) {
+          // error exception and break
+          LOGGER.error("OperationSyncDDLProtector can't transmit", e);
+          break;
+        }
+      } else {
+        try {
+          TimeUnit.SECONDS.sleep(30);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
       }
-
       if (transmitStatus) {
         break;
       } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
index 5668d918f7..2c2c89ef94 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
@@ -18,8 +18,10 @@
  */
 package org.apache.iotdb.db.doublelive;
 
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
@@ -27,13 +29,13 @@ import java.util.concurrent.TimeUnit;
 public class OperationSyncDMLProtector extends OperationSyncProtector {
 
   private final OperationSyncDDLProtector ddlProtector;
-  private final OperationSyncProducer producer;
+  private final SessionPool operationSyncSessionPool;
 
   public OperationSyncDMLProtector(
-      OperationSyncDDLProtector ddlProtector, OperationSyncProducer producer) {
+      OperationSyncDDLProtector ddlProtector, SessionPool operationSyncSessionPool) {
     super();
     this.ddlProtector = ddlProtector;
-    this.producer = producer;
+    this.operationSyncSessionPool = operationSyncSessionPool;
   }
 
   @Override
@@ -49,7 +51,38 @@ public class OperationSyncDMLProtector extends OperationSyncProtector {
 
   @Override
   protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
-    producer.put(
-        new Pair<>(planBuffer, OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan)));
+    while (true) {
+      // transmit E-Plan until it's been received
+      boolean transmitStatus = false;
+      if (StorageEngine.isSecondaryAlive().get()) {
+        try {
+          // try operation sync
+          planBuffer.position(0);
+          transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
+        } catch (IoTDBConnectionException connectionException) {
+          // warn IoTDBConnectionException and retry
+          LOGGER.warn("OperationSyncDMLProtector can't transmit, retrying...", connectionException);
+        } catch (Exception e) {
+          // error exception and break
+          LOGGER.error("OperationSyncDMLProtector can't transmit", e);
+          break;
+        }
+      } else {
+        try {
+          TimeUnit.SECONDS.sleep(30);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      if (transmitStatus) {
+        break;
+      } else {
+        try {
+          TimeUnit.SECONDS.sleep(1);
+        } catch (InterruptedException e) {
+          LOGGER.warn("OperationSyncDMLProtector is interrupted", e);
+        }
+      }
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
index a11d3fee28..dc061c167e 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
@@ -38,13 +38,13 @@ public class OperationSyncLogService implements Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncLogService.class);
 
-  private static final String logFileDir =
+  private static final String LOG_FILE_DIR =
       IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogDir();
-  private static final long logFileValidity =
+  private static final long LOG_FILE_VALIDITY =
       IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogValidity() * 1000L;
-  private static final int maxLogFileNum =
+  private static final int MAX_LOG_FILE_NUM =
       IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogNum();
-  private static final long maxLogFileSize =
+  private static final long MAX_LOG_FILE_SIZE =
       IoTDBDescriptor.getInstance().getConfig().getOperationSyncMaxLogSize();
 
   private static long currentLogFileSize = 0;
@@ -65,7 +65,7 @@ public class OperationSyncLogService implements Runnable {
     this.logFile = null;
     this.logWriter = null;
 
-    File logDir = new File(logFileDir);
+    File logDir = new File(LOG_FILE_DIR);
     if (!logDir.exists()) {
       if (!logDir.mkdirs()) {
         LOGGER.error("Can't make OperationSyncLog file dir: {}", logDir.getAbsolutePath());
@@ -77,9 +77,9 @@ public class OperationSyncLogService implements Runnable {
   public void run() {
     // Check if there exists remnant logs
     List<Integer> logFileIDList = new ArrayList<>();
-    for (int ID = 0; ID < maxLogFileNum; ID++) {
+    for (int ID = 0; ID < MAX_LOG_FILE_NUM; ID++) {
       File file =
-          SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator + logFileName + ID);
+          SystemFileFactory.INSTANCE.getFile(LOG_FILE_DIR + File.separator + logFileName + ID);
       if (file.exists()) {
         logFileIDList.add(ID);
       }
@@ -96,10 +96,12 @@ public class OperationSyncLogService implements Runnable {
       }
 
       for (int i = firstID; i < logFileIDList.size(); i++) {
-        protector.registerLogFile(logFileDir + File.separator + logFileName + logFileIDList.get(i));
+        protector.registerLogFile(
+            LOG_FILE_DIR + File.separator + logFileName + logFileIDList.get(i));
       }
       for (int i = 0; i < firstID; i++) {
-        protector.registerLogFile(logFileDir + File.separator + logFileName + logFileIDList.get(i));
+        protector.registerLogFile(
+            LOG_FILE_DIR + File.separator + logFileName + logFileIDList.get(i));
       }
 
       int nextID;
@@ -108,7 +110,7 @@ public class OperationSyncLogService implements Runnable {
       } else {
         nextID = logFileIDList.get(firstID - 1) + 1;
       }
-      logFileID = nextID % maxLogFileNum;
+      logFileID = nextID % MAX_LOG_FILE_NUM;
     } else {
       logFileID = 0;
     }
@@ -117,7 +119,8 @@ public class OperationSyncLogService implements Runnable {
       // Check the validity of logFile
       logWriterLock.lock();
       try {
-        if (logWriter != null && System.currentTimeMillis() - logFileCreateTime > logFileValidity) {
+        if (logWriter != null
+            && System.currentTimeMillis() - logFileCreateTime > LOG_FILE_VALIDITY) {
           // Submit logFile when it's expired
           submitLogFile();
         }
@@ -161,10 +164,10 @@ public class OperationSyncLogService implements Runnable {
     }
 
     protector.registerLogFile(
-        logFileDir
+        LOG_FILE_DIR
             + File.separator
             + logFileName
-            + (logFileID - 1 + maxLogFileNum) % maxLogFileNum);
+            + (logFileID - 1 + MAX_LOG_FILE_NUM) % MAX_LOG_FILE_NUM);
 
     logWriter = null;
     logFile = null;
@@ -172,7 +175,7 @@ public class OperationSyncLogService implements Runnable {
 
   private void createLogFile() {
     logFile =
-        SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator + logFileName + logFileID);
+        SystemFileFactory.INSTANCE.getFile(LOG_FILE_DIR + File.separator + logFileName + logFileID);
     while (true) {
       try {
         if (logFile.createNewFile()) {
@@ -190,7 +193,7 @@ public class OperationSyncLogService implements Runnable {
         }
       }
     }
-    logFileID = (logFileID + 1) % maxLogFileNum;
+    logFileID = (logFileID + 1) % MAX_LOG_FILE_NUM;
   }
 
   public static synchronized void incLogFileSize(long size) {
@@ -202,7 +205,7 @@ public class OperationSyncLogService implements Runnable {
   }
 
   public void write(ByteBuffer buffer) throws IOException {
-    if (currentLogFileSize < maxLogFileSize) {
+    if (currentLogFileSize < MAX_LOG_FILE_SIZE) {
       if (logWriter == null) {
         // Create logFile when there are no valid
         createLogFile();
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
index 774cb3e2e1..dccd4db016 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
@@ -24,23 +24,32 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeactivateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 
 public class OperationSyncPlanTypeUtils {
 
   public static OperationSyncPlanType getOperationSyncPlanType(PhysicalPlan plan) {
-    if (plan instanceof SetStorageGroupPlan
+    if (plan instanceof DeletePlan || plan instanceof InsertPlan) {
+      return OperationSyncPlanType.DMLPlan;
+    } else if (plan instanceof SetStorageGroupPlan
         || plan instanceof DeleteStorageGroupPlan
         || plan instanceof CreateTimeSeriesPlan
         || plan instanceof CreateMultiTimeSeriesPlan
@@ -54,10 +63,15 @@ public class OperationSyncPlanTypeUtils {
         || plan instanceof DropTemplatePlan
         || plan instanceof PruneTemplatePlan
         || plan instanceof SetTemplatePlan
-        || plan instanceof UnsetTemplatePlan) {
+        || plan instanceof UnsetTemplatePlan
+        || plan instanceof SetTTLPlan
+        || plan instanceof CreateContinuousQueryPlan
+        || plan instanceof DataAuthPlan
+        || plan instanceof DropContinuousQueryPlan
+        || plan instanceof ChangeAliasPlan
+        || plan instanceof ChangeTagOffsetPlan
+        || plan instanceof AuthorPlan) {
       return OperationSyncPlanType.DDLPlan;
-    } else if (plan instanceof DeletePlan || plan instanceof InsertPlan) {
-      return OperationSyncPlanType.DMLPlan;
     }
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
index 2f23b97e05..fbaf0ae463 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.concurrent.BlockingQueue;
 
 /**
@@ -33,22 +35,48 @@ import java.util.concurrent.BlockingQueue;
 public class OperationSyncProducer {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncProducer.class);
-
-  private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
-      operationSyncQueue;
+  private final ArrayList<
+          BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>>
+      operationSyncQueues;
+  private final OperationSyncLogService dmlLogService;
 
   public OperationSyncProducer(
-      BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
-          operationSyncQueue) {
-    this.operationSyncQueue = operationSyncQueue;
+      ArrayList<BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>>
+          operationSyncQueue,
+      OperationSyncLogService operationSyncDMLLogService) {
+    this.operationSyncQueues = operationSyncQueue;
+    this.dmlLogService = operationSyncDMLLogService;
   }
 
-  public void put(Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> planPair) {
+  public void put(
+      Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> planPair,
+      String deviceName) {
+
+    ByteBuffer headBuffer;
+    headBuffer = planPair.left;
+    headBuffer.position(0);
+    int index;
+    if (deviceName == null) {
+      index = operationSyncQueues.size() - 1;
+    } else {
+      try {
+        index = Math.abs(deviceName.hashCode()) % (operationSyncQueues.size() - 1);
+      } catch (Exception e) {
+        index = 0;
+      }
+    }
+    if (operationSyncQueues.get(index).offer(planPair)) {
+      return;
+    }
     try {
-      planPair.left.position(0);
-      operationSyncQueue.put(planPair);
-    } catch (InterruptedException e) {
-      LOGGER.error("OperationSync cache failed.", e);
+      // must set buffer position to limit() before serialization
+      headBuffer.position(headBuffer.limit());
+      dmlLogService.acquireLogWriter();
+      dmlLogService.write(headBuffer);
+    } catch (IOException e) {
+      LOGGER.error("OperationSyncConsumer can't serialize physicalPlan", e);
+    } finally {
+      dmlLogService.releaseLogWriter();
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
index a9ff399c3b..c18af522c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.ReentrantLock;
 public abstract class OperationSyncProtector implements Runnable {
 
   protected static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncProtector.class);
-  protected static final int logFileValidity =
+  protected static final int LOG_FILE_VALIDITY =
       IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogValidity();
 
   // For transmit log files
@@ -105,7 +105,7 @@ public abstract class OperationSyncProtector implements Runnable {
 
       try {
         // Sleep a while before next check
-        TimeUnit.SECONDS.sleep(logFileValidity);
+        TimeUnit.SECONDS.sleep(LOG_FILE_VALIDITY);
       } catch (InterruptedException e) {
         LOGGER.warn("OperationSyncProtector been interrupted", e);
       }
@@ -126,7 +126,7 @@ public abstract class OperationSyncProtector implements Runnable {
             e);
         continue;
       }
-
+      LOGGER.info("begin trans " + logFileName);
       while (logReader.hasNext()) {
         // read and re-serialize the PhysicalPlan
         PhysicalPlan nextPlan = logReader.next();
@@ -140,7 +140,7 @@ public abstract class OperationSyncProtector implements Runnable {
         protectorByteStream.reset();
         transmitPhysicalPlan(nextBuffer, nextPlan);
       }
-
+      LOGGER.info("end trans " + logFileName);
       logReader.close();
       try {
         // sleep one second then delete OperationSyncLog
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 034773b55a..9d80720866 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -66,13 +66,20 @@ import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.metrics.MetricsService;
+import org.apache.iotdb.db.service.metrics.enums.Metric;
+import org.apache.iotdb.db.service.metrics.enums.Tag;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.db.utils.ThreadUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.session.util.SystemStatus;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -105,23 +112,26 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 public class StorageEngine implements IService {
   private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
-
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
+  private static final long HEARTBEAT_CHECK_INTERVAL = 30L;
+  private static final long SECONDARY_METRIC_INTERVAL = 30L;
 
   /* OperationSync module */
   private static final boolean isEnableOperationSync =
       IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
-  private static SessionPool operationSyncsessionPool;
+  private static SessionPool operationSyncSessionPool;
   private static OperationSyncProducer operationSyncProducer;
   private static OperationSyncDDLProtector operationSyncDDLProtector;
   private static OperationSyncLogService operationSyncDDLLogService;
+  private static AtomicBoolean isSecondaryAlive = new AtomicBoolean(false);
 
   /**
    * Time range for dividing storage group, the time unit is the same with IoTDB's
@@ -156,55 +166,81 @@ public class StorageEngine implements IService {
   // add customized listeners here for flush and close events
   private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
   private List<FlushListener> customFlushListeners = new ArrayList<>();
+  ArrayList<BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>>
+      arrayListBlockQueue;
 
   private StorageEngine() {
     if (isEnableOperationSync) {
-      /* Open OperationSync */
+      // Open OperationSync
       IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+      int cacheNum = config.getOperationSyncProducerCacheNum();
+
       // create SessionPool for OperationSync
-      operationSyncsessionPool =
+      operationSyncSessionPool =
           new SessionPool(
               config.getSecondaryAddress(),
               config.getSecondaryPort(),
               config.getSecondaryUser(),
               config.getSecondaryPassword(),
-              5,
+              config.getSecondarySessionPoolMaxSize(),
               config.isRpcThriftCompressionEnable());
+      ThreadPoolExecutor threadPool =
+          new ThreadPoolExecutor(
+              cacheNum + 5,
+              cacheNum + 5,
+              3,
+              TimeUnit.SECONDS,
+              new ArrayBlockingQueue<>(5),
+              new ThreadPoolExecutor.AbortPolicy());
 
       // create operationSyncDDLProtector and operationSyncDDLLogService
-      operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
-      new Thread(operationSyncDDLProtector).start();
+      ScheduledExecutorService secondaryCheckThread =
+          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("secondary-Check");
+
+      secondaryCheckThread.scheduleAtFixedRate(
+          this::checkSecondaryIsLife, 0L, HEARTBEAT_CHECK_INTERVAL, TimeUnit.SECONDS);
+
+      operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncSessionPool);
+      threadPool.execute(operationSyncDDLProtector);
       operationSyncDDLLogService =
           new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
-      new Thread(operationSyncDDLLogService).start();
+      threadPool.execute(operationSyncDDLLogService);
 
       // create OperationSyncProducer
-      BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
-          blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
-      operationSyncProducer = new OperationSyncProducer(blockingQueue);
-
-      // create OperationSyncDMLProtector and OperationSyncDMLLogService
+      arrayListBlockQueue = new ArrayList<>(cacheNum);
+      for (int i = 0; i < cacheNum; i++) {
+        BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+            blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
+        arrayListBlockQueue.add(blockingQueue);
+      }
       OperationSyncDMLProtector operationSyncDMLProtector =
-          new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
-      new Thread(operationSyncDMLProtector).start();
+          new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncSessionPool);
+      threadPool.execute(operationSyncDMLProtector);
       OperationSyncLogService operationSyncDMLLogService =
           new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
-      new Thread(operationSyncDMLLogService).start();
-
+      operationSyncProducer =
+          new OperationSyncProducer(arrayListBlockQueue, operationSyncDMLLogService);
+      // create OperationSyncDMLProtector and OperationSyncDMLLogService
+      threadPool.execute(operationSyncDMLLogService);
       // create OperationSyncConsumer
-      for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
-        OperationSyncConsumer consumer =
+      for (int i = 0; i < cacheNum; i++) {
+        threadPool.execute(
             new OperationSyncConsumer(
-                blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
-        new Thread(consumer).start();
+                arrayListBlockQueue.get(i), operationSyncSessionPool, operationSyncDMLLogService));
+      }
+      if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
+        ScheduledExecutorService secondaryMetricThread =
+            IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("secondary-Metric");
+        secondaryMetricThread.scheduleAtFixedRate(
+            this::secondaryMetric, 0L, SECONDARY_METRIC_INTERVAL, TimeUnit.SECONDS);
       }
-
       logger.info("Successfully initialize OperationSync!");
     } else {
-      operationSyncsessionPool = null;
+      operationSyncSessionPool = null;
       operationSyncProducer = null;
       operationSyncDDLProtector = null;
       operationSyncDDLLogService = null;
+      arrayListBlockQueue = null;
     }
   }
 
@@ -246,14 +282,14 @@ public class StorageEngine implements IService {
         OperationSyncWriteTask ddlTask =
             new OperationSyncWriteTask(
                 buffer,
-                operationSyncsessionPool,
+                operationSyncSessionPool,
                 operationSyncDDLProtector,
                 operationSyncDDLLogService);
         ddlTask.run();
         break;
       case DMLPlan:
         // Put into OperationSyncProducer
-        operationSyncProducer.put(new Pair<>(buffer, planType));
+        operationSyncProducer.put(new Pair<>(buffer, planType), getDeviceNameByPlan(physicalPlan));
     }
   }
 
@@ -273,6 +309,25 @@ public class StorageEngine implements IService {
     return result;
   }
 
+  public static String getDeviceNameByPlan(PhysicalPlan plan) {
+    if (plan instanceof InsertPlan) {
+      InsertPlan physicalPlan = (InsertPlan) plan;
+      if (physicalPlan.getDevicePath() == null) {
+        if (physicalPlan.getPaths() != null && physicalPlan.getPaths().size() > 0) {
+          return physicalPlan.getPaths().get(0).getDevice();
+        } else {
+          return null;
+        }
+      }
+      return physicalPlan.getDevicePath().getDevice();
+    }
+    return null;
+  }
+
+  public static AtomicBoolean isSecondaryAlive() {
+    return isSecondaryAlive;
+  }
+
   public static long getTimePartitionInterval() {
     if (timePartitionInterval == -1) {
       initTimePartition();
@@ -393,6 +448,29 @@ public class StorageEngine implements IService {
     startTimedService();
   }
 
+  private void checkSecondaryIsLife() {
+    try {
+      isSecondaryAlive.set(operationSyncSessionPool.getSystemStatus() == SystemStatus.NORMAL);
+    } catch (IoTDBConnectionException e) {
+      isSecondaryAlive.set(false);
+    }
+  }
+
+  private void secondaryMetric() {
+    for (int i = 0; i < arrayListBlockQueue.size(); i++) {
+      MetricsService.getInstance()
+          .getMetricManager()
+          .getOrCreateGauge(
+              Metric.QUEUE.toString(),
+              MetricLevel.IMPORTANT,
+              Tag.NAME.toString(),
+              "OperationSyncProducerQueueSize" + i,
+              Tag.STATUS.toString(),
+              "running")
+          .set(arrayListBlockQueue.get(i).size());
+    }
+  }
+
   private void checkTTL() {
     try {
       IoTDB.metaManager.checkTTLOnLastCache();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
index 79ad566f59..9663da20fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
@@ -140,6 +142,21 @@ public class CreateContinuousQueryPlan extends PhysicalPlan {
     buffer.putLong(firstExecutionTimeBoundary);
   }
 
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.write((byte) PhysicalPlanType.CREATE_CONTINUOUS_QUERY.ordinal());
+    ReadWriteIOUtils.write(continuousQueryName, stream);
+    ReadWriteIOUtils.write(querySql, stream);
+    ReadWriteIOUtils.write(querySqlBeforeGroupByClause, stream);
+    ReadWriteIOUtils.write(querySqlAfterGroupByClause, stream);
+    ReadWriteIOUtils.write(targetPath.getFullPath(), stream);
+    stream.writeLong(everyInterval);
+    stream.writeLong(forInterval);
+    stream.writeLong(groupByTimeInterval);
+    ReadWriteIOUtils.write(groupByTimeIntervalString, stream);
+    stream.writeLong(firstExecutionTimeBoundary);
+  }
+
   @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     continuousQueryName = ReadWriteIOUtils.readString(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java
index 463b03c408..5d273c2e33 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -60,4 +62,10 @@ public class DropContinuousQueryPlan extends PhysicalPlan {
   public void deserialize(ByteBuffer buffer) {
     continuousQueryName = ReadWriteIOUtils.readString(buffer);
   }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.write((byte) PhysicalPlanType.DROP_CONTINUOUS_QUERY.ordinal());
+    ReadWriteIOUtils.write(continuousQueryName, stream);
+  }
 }